You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/12/16 14:03:24 UTC
[drill] branch master updated: DRILL-8015: Add MongoDB Metastore implementation (#2384)
This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new f8845fe DRILL-8015: Add MongoDB Metastore implementation (#2384)
f8845fe is described below
commit f8845fe2b4cd44c2900bde92b356c47d1530374f
Author: feiteng <32...@qq.com>
AuthorDate: Mon Oct 25 22:38:46 2021 +0800
DRILL-8015: Add MongoDB Metastore implementation (#2384)
---
contrib/storage-mongo/pom.xml | 2 +-
distribution/pom.xml | 5 +
distribution/src/assemble/component.xml | 1 +
.../drill-metastore-override-example.conf | 7 +
.../org/apache/drill/exec/record/SchemaUtil.java | 8 +-
.../java/org/apache/drill/metastore/TestData.java | 2 +-
.../tables/AbstractBasicTablesRequestsTest.java | 2 +-
metastore/mongo-metastore/pom.xml | 71 ++++++++
.../drill/metastore/mongo/MongoMetastore.java | 71 ++++++++
.../metastore/mongo/MongoMetastoreContext.java | 54 ++++++
.../mongo/components/tables/MongoTables.java | 83 +++++++++
.../tables/TablesOperationTransformer.java | 58 ++++++
.../tables/TablesOutputDataTransformer.java | 57 ++++++
.../mongo/components/tables/TablesTransformer.java | 54 ++++++
.../mongo/config/MongoConfigConstants.java | 64 +++++++
.../mongo/exception/MongoMetastoreException.java | 36 ++++
.../drill/metastore/mongo/operate/MongoDelete.java | 49 +++++
.../metastore/mongo/operate/MongoMetadata.java | 32 ++++
.../drill/metastore/mongo/operate/MongoModify.java | 85 +++++++++
.../metastore/mongo/operate/MongoOperation.java | 28 +++
.../drill/metastore/mongo/operate/MongoRead.java | 61 +++++++
.../drill/metastore/mongo/operate/Overwrite.java | 58 ++++++
.../mongo/transform/FilterExpressionVisitor.java | 109 +++++++++++
.../mongo/transform/FilterTransformer.java | 65 +++++++
.../mongo/transform/InputDataTransformer.java | 75 ++++++++
.../mongo/transform/OperationTransformer.java | 58 ++++++
.../mongo/transform/OutputDataTransformer.java | 85 +++++++++
.../metastore/mongo/transform/Transformer.java | 52 ++++++
.../src/main/resources/drill-metastore-module.conf | 21 +++
.../drill/metastore/mongo/MongoBaseTest.java | 201 +++++++++++++++++++++
.../tables/TestMongoBasicTablesRequests.java | 110 +++++++++++
.../tables/TestTablesInputDataTransformer.java | 142 +++++++++++++++
.../tables/TestTablesOperationTransformer.java | 122 +++++++++++++
.../tables/TestTablesOutputDataTransformer.java | 124 +++++++++++++
metastore/pom.xml | 1 +
pom.xml | 1 +
36 files changed, 2050 insertions(+), 4 deletions(-)
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 93d7ad3..34df507 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
- <version>4.3.3</version>
+ <version>${mongo.version}</version>
</dependency>
<!-- Test dependency -->
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 7a2f52d..5fc7d18 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -97,6 +97,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.drill.metastore</groupId>
+ <artifactId>drill-mongo-metastore</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 4c6124f..7421aa3 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -36,6 +36,7 @@
<include>org.apache.drill.metastore:drill-metastore-api:jar</include>
<include>org.apache.drill.metastore:drill-iceberg-metastore:jar</include>
<include>org.apache.drill.metastore:drill-rdbms-metastore:jar</include>
+ <include>org.apache.drill.metastore:drill-mongo-metastore:jar</include>
<include>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar</include>
<include>org.apache.drill.contrib.storage-hive:drill-hive-exec-shaded:jar</include>
<include>org.apache.drill.contrib.data:tpch-sample-data:jar</include>
diff --git a/distribution/src/main/resources/drill-metastore-override-example.conf b/distribution/src/main/resources/drill-metastore-override-example.conf
index d2742a8..42e860c 100644
--- a/distribution/src/main/resources/drill-metastore-override-example.conf
+++ b/distribution/src/main/resources/drill-metastore-override-example.conf
@@ -21,6 +21,7 @@
drill.metastore: {
# For Drill Iceberg Metastore use: org.apache.drill.metastore.iceberg.IcebergMetastore
+ # For Drill Mongo Metastore use: org.apache.drill.metastore.mongo.MongoMetastore
implementation.class: "org.apache.drill.metastore.rdbms.RdbmsMetastore",
# If implementation class is RdbmsMetastore and no data source config is indicated,
@@ -61,4 +62,10 @@ drill.metastore: {
# relative_path: ${drill.exec.zk.root}"/metastore/iceberg"
}
}
+
+ mongo: {
+ # connection: "mongodb://localhost:27017",
+ # database: "meta",
+ # table_collection: "tables"
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 0a8620f..c68170f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -214,7 +214,13 @@ public class SchemaUtil {
currentNames.add(columnMetadata.name());
result.addAll(getColumnPaths(columnMetadata.tupleSchema(), currentNames));
} else {
- result.add(Collections.singletonList(columnMetadata.name()));
+ if (parentNames != null) {
+ List<String> combinedList = new ArrayList<>(parentNames);
+ combinedList.add(columnMetadata.name());
+ result.add(combinedList);
+ } else {
+ result.add(Collections.singletonList(columnMetadata.name()));
+ }
}
}
return result;
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
index a638d15..dbc9e56 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
@@ -55,7 +55,7 @@ public class TestData {
.lastModifiedTime(System.currentTimeMillis())
.partitionKeys(Collections.singletonMap("dir0", "2018"))
.additionalMetadata("additional test metadata")
- .metadataIdentifier("part_int=3/part_varchar=g/0_0_0.parquet")
+ .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
.column("`id`")
.locations(Arrays.asList("/tmp/nation/1", "/tmp/nation/2"))
.partitionValues(Arrays.asList("1", "2"))
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
index 1c6cd4e..aebbc8a 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
@@ -469,7 +469,7 @@ public abstract class AbstractBasicTablesRequestsTest extends BaseTest {
*
* @param tables Drill Metastore Tables instance
*/
- private static void prepareData(Tables tables) {
+ protected static void prepareData(Tables tables) {
TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
nationTable = BaseTableMetadata.builder()
diff --git a/metastore/mongo-metastore/pom.xml b/metastore/mongo-metastore/pom.xml
new file mode 100644
index 0000000..33eeef4
--- /dev/null
+++ b/metastore/mongo-metastore/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>metastore-parent</artifactId>
+ <groupId>org.apache.drill.metastore</groupId>
+ <version>1.20.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>drill-mongo-metastore</artifactId>
+ <name>Drill : Metastore : Mongo</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.metastore</groupId>
+ <artifactId>drill-metastore-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.metastore</groupId>
+ <artifactId>drill-metastore-api</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mongodb</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastore.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastore.java
new file mode 100644
index 0000000..955ac85
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastore.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.components.views.Views;
+import org.apache.drill.metastore.mongo.components.tables.MongoTables;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+
+/**
+ * Mongo Drill Metastore implementation.
+ */
+public class MongoMetastore implements Metastore {
+
+ private final MongoClient client;
+ private final String database;
+ private final String tableCollection;
+
+ public MongoMetastore(DrillConfig config) {
+ this.client = MongoClients.create(
+ new ConnectionString(config.getString(MongoConfigConstants.CONNECTION)));
+ if (config.hasPath(MongoConfigConstants.DATABASE)) {
+ this.database = config.getString(MongoConfigConstants.DATABASE);
+ } else {
+ this.database = MongoConfigConstants.DEFAULT_DATABASE;
+ }
+ if (config.hasPath(MongoConfigConstants.TABLE_COLLECTION)) {
+ this.tableCollection = config.getString(MongoConfigConstants.TABLE_COLLECTION);
+ } else {
+ this.tableCollection = MongoConfigConstants.DEFAULT_TABLE_COLLECTION;
+ }
+ }
+
+ @Override
+ public Tables tables() {
+ return new MongoTables(
+ client.getDatabase(database).getCollection(tableCollection), client);
+ }
+
+ @Override
+ public Views views() {
+ throw new UnsupportedOperationException("Views metadata support is not implemented");
+ }
+
+ @Override
+ public void close() {
+ if (this.client != null) {
+ this.client.close();
+ }
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastoreContext.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastoreContext.java
new file mode 100644
index 0000000..fced238
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastoreContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import org.apache.drill.metastore.mongo.transform.Transformer;
+import org.bson.Document;
+
+/**
+ * Provides Mongo Metastore component tools to transform, read or write data from / into Mongo collections.
+ *
+ * @param <T> Metastore component unit metadata type
+ */
+public interface MongoMetastoreContext<T> {
+
+ /**
+ * Returns Mongo collection implementation used as storage for Metastore
+ * component data.
+ *
+ * @return Mongo collection instance
+ */
+ MongoCollection<Document> table();
+
+ /**
+ * Returns transformer which allows various
+ * data, filters, operations transformation.
+ *
+ * @return transformer instance
+ */
+ Transformer<T> transformer();
+
+ /**
+ * Returns Mongo client implementation
+ *
+ * @return Mongo client instance
+ */
+ MongoClient client();
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/MongoTables.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/MongoTables.java
new file mode 100644
index 0000000..4f9726f
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/MongoTables.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.components.tables.TablesMetadataTypeValidator;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.operate.MongoMetadata;
+import org.apache.drill.metastore.mongo.operate.MongoModify;
+import org.apache.drill.metastore.mongo.operate.MongoRead;
+import org.apache.drill.metastore.mongo.transform.Transformer;
+import org.apache.drill.metastore.operate.Metadata;
+import org.apache.drill.metastore.operate.Modify;
+import org.apache.drill.metastore.operate.Read;
+import org.bson.Document;
+
+
+/**
+ * Metastore Tables component which stores tables metadata in mongo collection
+ * Provides methods to read and modify tables metadata.
+ */
+public class MongoTables implements Tables, MongoMetastoreContext<TableMetadataUnit> {
+
+ private final MongoClient client;
+ private final MongoCollection<Document> tableCollection;
+
+ public MongoTables(MongoCollection<Document> tableCollection, MongoClient client) {
+ this.tableCollection = tableCollection;
+ this.client = client;
+ }
+
+ public MongoMetastoreContext<TableMetadataUnit> context() {
+ return this;
+ }
+
+ @Override
+ public Metadata metadata() {
+ return new MongoMetadata();
+ }
+
+ @Override
+ public Read<TableMetadataUnit> read() {
+ return new MongoRead<>(TablesMetadataTypeValidator.INSTANCE, context());
+ }
+
+ @Override
+ public Modify<TableMetadataUnit> modify() {
+ return new MongoModify<>(TablesMetadataTypeValidator.INSTANCE, context());
+ }
+
+ @Override
+ public MongoCollection<Document> table() {
+ return tableCollection;
+ }
+
+ @Override
+ public MongoClient client() {
+ return client;
+ }
+
+ @Override
+ public Transformer<TableMetadataUnit> transformer() {
+ return new TablesTransformer(context());
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOperationTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOperationTransformer.java
new file mode 100644
index 0000000..c149e30
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOperationTransformer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.operate.Overwrite;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.bson.Document;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Metastore Tables component operations transformer that provides mechanism
+ * to convert {@link TableMetadataUnit} data to Metastore overwrite / delete operations.
+ */
+public class TablesOperationTransformer extends OperationTransformer<TableMetadataUnit> {
+
+ public TablesOperationTransformer(MongoMetastoreContext<TableMetadataUnit> context) {
+ super(context);
+ }
+
+ /**
+ * Groups given list of {@link TableMetadataUnit}, convert them to list of overwrite operations
+ *
+ * @param units Metastore component units
+ * @return list of overwrite operations
+ */
+ @Override
+ public List<Overwrite> toOverwrite(List<TableMetadataUnit> units) {
+ return context.transformer().inputData()
+ .units(units)
+ .execute()
+ .stream()
+ .map(document ->
+ new Overwrite(document,
+ new Document()
+ .append(MongoConfigConstants.ID, document.get(MongoConfigConstants.ID))))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOutputDataTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOutputDataTransformer.java
new file mode 100644
index 0000000..d437ac9
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOutputDataTransformer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.apache.drill.metastore.mongo.transform.OutputDataTransformer;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Metastore Tables component output data transformer that transforms
+ * {@link org.bson.Document} into {@link TableMetadataUnit}.
+ */
+public class TablesOutputDataTransformer extends OutputDataTransformer<TableMetadataUnit> {
+
+ public TablesOutputDataTransformer(Map<String, MethodHandle> unitSetters) {
+ super(unitSetters);
+ }
+
+ @Override
+ public List<TableMetadataUnit> execute() {
+ List<TableMetadataUnit> results = new ArrayList<>();
+ for (Map<MethodHandle, Object> valueToSet : valuesToSet()) {
+ TableMetadataUnit.Builder builder = TableMetadataUnit.builder();
+ for (Map.Entry<MethodHandle, Object> entry : valueToSet.entrySet()) {
+ try {
+ entry.getKey().invokeWithArguments(builder, entry.getValue());
+ } catch (Throwable e) {
+ throw new MongoMetastoreException(
+ String.format("Unable to invoke setter for [%s] using [%s]",
+ TableMetadataUnit.Builder.class.getSimpleName(), entry.getKey()), e);
+ }
+ }
+ results.add(builder.build());
+ }
+ return results;
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesTransformer.java
new file mode 100644
index 0000000..f931a8b
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesTransformer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.transform.InputDataTransformer;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.apache.drill.metastore.mongo.transform.OutputDataTransformer;
+import org.apache.drill.metastore.mongo.transform.Transformer;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+
+/**
+ * Metastore Tables component filter, data and operations transformer.
+ * Provides needed transformations when reading / writing {@link TableMetadataUnit}
+ * from / into Mongo collection.
+ */
+public class TablesTransformer implements Transformer<TableMetadataUnit> {
+
+ private final MongoMetastoreContext<TableMetadataUnit> context;
+
+ public TablesTransformer(MongoMetastoreContext<TableMetadataUnit> context) {
+ this.context = context;
+ }
+
+ @Override
+ public InputDataTransformer<TableMetadataUnit> inputData() {
+ return new InputDataTransformer<>(TableMetadataUnit.SCHEMA.unitGetters());
+ }
+
+ @Override
+ public OutputDataTransformer<TableMetadataUnit> outputData() {
+ return new TablesOutputDataTransformer(TableMetadataUnit.SCHEMA.unitBuilderSetters());
+ }
+
+ @Override
+ public OperationTransformer<TableMetadataUnit> operation() {
+ return new TablesOperationTransformer(context);
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/config/MongoConfigConstants.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/config/MongoConfigConstants.java
new file mode 100644
index 0000000..8c60a33
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/config/MongoConfigConstants.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.config;
+
+import org.apache.drill.metastore.config.MetastoreConfigConstants;
+
+/**
+ * Drill Mongo Metastore configuration which is defined
+ * in {@link MetastoreConfigConstants#MODULE_RESOURCE_FILE_NAME} file.
+ */
+public interface MongoConfigConstants {
+
+ /**
+ * Drill Mongo Metastore configuration properties namespace.
+ */
+ String BASE = MetastoreConfigConstants.BASE + "mongo.";
+
+ /**
+ * Mongo Metastore data source url property. Required.
+ */
+ String CONNECTION = BASE + "connection";
+
+ /**
+ * Database to store meta data. Optional, default is
+ * {@link MongoConfigConstants#DEFAULT_DATABASE}
+ */
+ String DATABASE = BASE + "database";
+
+ /**
+ * Collection to store meta data for tables. Optional, default is
+ * {@link MongoConfigConstants#DEFAULT_TABLE_COLLECTION}
+ */
+ String TABLE_COLLECTION = BASE + "table_collection";
+
+ /**
+ * Default database to store meta data.
+ */
+ String DEFAULT_DATABASE = "meta";
+
+ /**
+ * Default collection to store meta data for tables.
+ */
+ String DEFAULT_TABLE_COLLECTION = "tables";
+
+ /**
+ * Field name used to identify one document uniquely.
+ */
+ String ID = "_id";
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/exception/MongoMetastoreException.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/exception/MongoMetastoreException.java
new file mode 100644
index 0000000..c548aff
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/exception/MongoMetastoreException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.exception;
+
+import org.apache.drill.metastore.exceptions.MetastoreException;
+
+/**
+ * Specific Mongo Drill Metastore runtime exception to indicate exceptions thrown
+ * during Mongo Drill Metastore code execution.
+ */
+public class MongoMetastoreException extends MetastoreException {
+ private static final long serialVersionUID = 0L;
+
+ public MongoMetastoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MongoMetastoreException(String message) {
+ super(message);
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoDelete.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoDelete.java
new file mode 100644
index 0000000..3e475d7
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoDelete.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Mongo delete operation: deletes documents based on given row filter.
+ */
+public class MongoDelete implements MongoOperation {
+
+ private final Bson filter;
+
+ public MongoDelete(Bson filter) {
+ this.filter = filter;
+ }
+
+ public Bson filter() {
+ return filter;
+ }
+
+ @Override
+ public void execute(MongoCollection<Document> collection) {
+ try {
+ collection.deleteMany(filter);
+ } catch (Exception e) {
+ throw new MongoMetastoreException(String.format("failed to delete by %s" +
+ " from %s", filter.toString(), collection.getNamespace()), e);
+ }
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoMetadata.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoMetadata.java
new file mode 100644
index 0000000..5e54cc9
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoMetadata.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import org.apache.drill.metastore.operate.Metadata;
+
+/**
+ * Implementation of {@link Metadata} interface.
+ * Indicates that Mongo Metastore does not support versioning.
+ */
+public class MongoMetadata implements Metadata {
+
+ @Override
+ public boolean supportsVersioning() {
+ return false;
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoModify.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoModify.java
new file mode 100644
index 0000000..9a5b174
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoModify.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.ClientSession;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.apache.drill.metastore.operate.AbstractModify;
+import org.apache.drill.metastore.operate.Delete;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Modify;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of {@link Modify} interface based on {@link AbstractModify} parent class.
+ * Modifies information in Mongo collection based on given overwrite or delete operations.
+ * Executes given operations in one transaction.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class MongoModify<T> extends AbstractModify<T> {
+ private static final Logger logger = LoggerFactory.getLogger(MongoModify.class);
+
+ private final OperationTransformer<T> transformer;
+ private final MongoMetastoreContext<T> context;
+ private final List<MongoOperation> operations = new ArrayList<>();
+
+ public MongoModify(MetadataTypeValidator metadataTypeValidator, MongoMetastoreContext<T> context) {
+ super(metadataTypeValidator);
+ this.context = context;
+ this.transformer = context.transformer().operation();
+ }
+
+ @Override
+ public void execute() {
+ if (operations.isEmpty()) {
+ return;
+ }
+ executeOperations(operations);
+ }
+
+ @Override
+ public void purge() {
+ executeOperations(Collections.singletonList(transformer.toDeleteAll()));
+ }
+
+ @Override
+ protected void addOverwrite(List<T> units) {
+ operations.addAll(transformer.toOverwrite(units));
+ }
+
+ @Override
+ protected void addDelete(Delete delete) {
+ operations.add(transformer.toDelete(delete));
+ }
+
+ private void executeOperations(List<MongoOperation> operations) {
+ ClientSession clientSession = context.client().startSession();
+ String res = clientSession.withTransaction(() -> {
+ operations.forEach(o -> o.execute(context.table()));
+ return String.format("executed %s operations", operations.size());
+ });
+ logger.debug(res);
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoOperation.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoOperation.java
new file mode 100644
index 0000000..4e14b39
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoOperation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.MongoCollection;
+import org.bson.Document;
+
+/**
+ * Mongo operation
+ */
+public interface MongoOperation {
+ void execute(MongoCollection<Document> collection);
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoRead.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoRead.java
new file mode 100644
index 0000000..a1d7d42
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoRead.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.transform.FilterTransformer;
+import org.apache.drill.metastore.operate.AbstractRead;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Read;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link Read} interface based on {@link AbstractRead} parent class.
+ * Reads information from Mongo collection based on given filter expression.
+ * Supports reading information for specific columns.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class MongoRead<T> extends AbstractRead<T> {
+
+ private final MongoMetastoreContext<T> context;
+
+ public MongoRead(MetadataTypeValidator metadataTypeValidator, MongoMetastoreContext<T> context) {
+ super(metadataTypeValidator);
+ this.context = context;
+ }
+
+ @Override
+ protected List<T> internalExecute() {
+ FilterTransformer filterTransformer = context.transformer().filter();
+ Bson rowFilter = filterTransformer.combine(
+ filterTransformer.transform(metadataTypes), filterTransformer.transform(filter));
+ List<Document> documents = Lists.newLinkedList();
+ context.table().find(rowFilter).forEach(documents::add);
+ return context.transformer().outputData()
+ .columns(columns.stream().map(MetastoreColumn::columnName).collect(Collectors.toList()))
+ .documents(documents)
+ .execute();
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/Overwrite.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/Overwrite.java
new file mode 100644
index 0000000..dedf851
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/Overwrite.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.ReplaceOptions;
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Mongo overwrite operation: overwrites data with given document based on given row filter.
+ */
+public class Overwrite implements MongoOperation {
+
+ private final Bson filter;
+ private final Document data;
+
+ public Overwrite(Document data, Bson filter) {
+ this.data = data;
+ this.filter = filter;
+ }
+
+ public Bson filter() {
+ return filter;
+ }
+
+ public Document data() {
+ return data;
+ }
+
+ @Override
+ public void execute(MongoCollection<Document> collection) {
+ ReplaceOptions replaceOptions = new ReplaceOptions().upsert(true);
+ try {
+ collection.replaceOne(filter, data, replaceOptions);
+ } catch (Exception e) {
+ throw new MongoMetastoreException(
+ String.format("failed to overwrite document by %s into %s",
+ filter.toString(), collection.getNamespace()), e);
+ }
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterExpressionVisitor.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterExpressionVisitor.java
new file mode 100644
index 0000000..eb3b28d
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterExpressionVisitor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import com.mongodb.client.model.Filters;
+import org.apache.drill.metastore.expressions.DoubleExpressionPredicate;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.expressions.IsPredicate;
+import org.apache.drill.metastore.expressions.ListPredicate;
+import org.apache.drill.metastore.expressions.SimplePredicate;
+import org.apache.drill.metastore.expressions.SingleExpressionPredicate;
+import org.bson.conversions.Bson;
+
+/**
+ * Visits {@link FilterExpression} implementations and transforms them into {@link Bson} implementations.
+ */
+public class FilterExpressionVisitor implements FilterExpression.Visitor<Bson> {
+
+ private static final FilterExpressionVisitor INSTANCE = new FilterExpressionVisitor();
+
+ public static FilterExpression.Visitor<Bson> get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Bson visit(SimplePredicate.Equal<?> expression) {
+ return Filters.eq(expression.column().columnName(), expression.value());
+ }
+
+ @Override
+ public Bson visit(SimplePredicate.NotEqual<?> expression) {
+ return Filters.ne(expression.column().columnName(), expression.value());
+ }
+
+ @Override
+ public Bson visit(SimplePredicate.LessThan<?> expression) {
+ return Filters.lt(expression.column().columnName(), expression.value());
+ }
+
+ @Override
+ public Bson visit(SimplePredicate.LessThanOrEqual<?> expression) {
+ return Filters.lte(expression.column().columnName(), expression.value());
+ }
+
+ @Override
+ public Bson visit(SimplePredicate.GreaterThan<?> expression) {
+ return Filters.gt(expression.column().columnName(), expression.value());
+ }
+
+ @Override
+ public Bson visit(SimplePredicate.GreaterThanOrEqual<?> expression) {
+ return Filters.gte(expression.column().columnName(), expression.value());
+ }
+
+ @Override
+ public Bson visit(ListPredicate.In<?> expression) {
+ return Filters.in(expression.column().columnName(), expression.values());
+ }
+
+ @Override
+ public Bson visit(ListPredicate.NotIn<?> expression) {
+ return Filters.nin(expression.column().columnName(), expression.values());
+ }
+
+ @Override
+ public Bson visit(IsPredicate.IsNull expression) {
+ return Filters.exists(expression.column().columnName(), false);
+ }
+
+ @Override
+ public Bson visit(IsPredicate.IsNotNull expression) {
+ return Filters.exists(expression.column().columnName());
+ }
+
+ @Override
+ public Bson visit(SingleExpressionPredicate.Not expression) {
+ Bson child = expression.expression().accept(this);
+ return Filters.not(child);
+ }
+
+ @Override
+ public Bson visit(DoubleExpressionPredicate.And expression) {
+ Bson right = expression.right().accept(this);
+ Bson left = expression.left().accept(this);
+ return Filters.and(right, left);
+ }
+
+ @Override
+ public Bson visit(DoubleExpressionPredicate.Or expression) {
+ Bson right = expression.right().accept(this);
+ Bson left = expression.left().accept(this);
+ return Filters.or(right, left);
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterTransformer.java
new file mode 100644
index 0000000..abd1d4a
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterTransformer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import com.mongodb.client.model.Filters;
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Transforms given input into Mongo {@link Document} which is used as filter
+ * to retrieve, overwrite or delete Metastore component data.
+ */
+public class FilterTransformer {
+
+ public Bson transform(FilterExpression filter) {
+ return filter == null ? new Document() : filter.accept(FilterExpressionVisitor.get());
+ }
+
+ public Bson transform(Set<MetadataType> metadataTypes) {
+ if (metadataTypes.contains(MetadataType.ALL)) {
+ return new BsonDocument();
+ }
+
+ Set<String> inConditionValues = metadataTypes.stream()
+ .map(Enum::name)
+ .collect(Collectors.toSet());
+
+ if (inConditionValues.size() == 1) {
+ return Filters.eq(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues.iterator().next());
+ }
+ return Filters.in(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues);
+ }
+
+ public Bson combine(Bson... expressions) {
+ if (expressions.length == 0) {
+ return new BsonDocument();
+ }
+ if (expressions.length == 1) {
+ return expressions[0];
+ }
+ return Filters.and(expressions[0], expressions[1]);
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/InputDataTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/InputDataTransformer.java
new file mode 100644
index 0000000..0238952
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/InputDataTransformer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import com.google.gson.Gson;
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.bson.Document;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Converts list of Metastore component units into {@link Document}.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class InputDataTransformer<T> {
+ private static final Gson GSON = new Gson();
+ private final List<T> units = new ArrayList<>();
+ private final Map<String, MethodHandle> unitGetters;
+
+ public InputDataTransformer(Map<String, MethodHandle> unitGetters) {
+ this.unitGetters = unitGetters;
+ }
+
+ public InputDataTransformer<T> units(List<T> units) {
+ this.units.addAll(units);
+ return this;
+ }
+
+ public Document createId(Document document) {
+ return Document.parse(GSON.toJson(ImmutableMap.of(
+ MetastoreColumn.STORAGE_PLUGIN.columnName(), document.get(MetastoreColumn.STORAGE_PLUGIN.columnName()),
+ MetastoreColumn.WORKSPACE.columnName(), document.get(MetastoreColumn.WORKSPACE.columnName()),
+ MetastoreColumn.TABLE_NAME.columnName(), document.get(MetastoreColumn.TABLE_NAME.columnName()),
+ MetastoreColumn.METADATA_TYPE.columnName(), document.get(MetastoreColumn.METADATA_TYPE.columnName()),
+ MetastoreColumn.METADATA_IDENTIFIER.columnName(), document.get(MetastoreColumn.METADATA_IDENTIFIER.columnName()))));
+ }
+
+ public List<Document> execute() {
+ return units.stream().map(unit -> {
+ Document document = new Document();
+ for (Map.Entry<String, MethodHandle> entry : unitGetters.entrySet()) {
+ try {
+ document.put(entry.getKey(), entry.getValue().invoke(unit));
+ } catch (Throwable e) {
+ throw new MongoMetastoreException(
+ String.format("Unable to invoke getter for column [%s] using [%s]", entry.getKey(), entry.getValue()), e);
+ }
+ }
+ return document.append(MongoConfigConstants.ID, createId(document));
+ }).collect(Collectors.toList());
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OperationTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OperationTransformer.java
new file mode 100644
index 0000000..405a379
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OperationTransformer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.operate.MongoDelete;
+import org.apache.drill.metastore.mongo.operate.MongoOperation;
+import org.apache.drill.metastore.mongo.operate.Overwrite;
+import org.apache.drill.metastore.operate.Delete;
+
+import java.util.List;
+
+/**
+ * Base class to transforms given input into {@link MongoOperation} implementations.
+ *
+ * @param <T> Metastore component unit type
+ */
+public abstract class OperationTransformer<T> {
+
+ protected final MongoMetastoreContext<T> context;
+
+ protected OperationTransformer(MongoMetastoreContext<T> context) {
+ this.context = context;
+ }
+
+ public MongoDelete toDeleteAll() {
+ return new MongoDelete(context.transformer().filter().transform((FilterExpression) null));
+ }
+
+ public MongoDelete toDelete(Delete delete) {
+ return new MongoDelete(context.transformer().filter().transform(delete.filter()));
+ }
+
+ /**
+ * Converts given list of Metastore components units into list of overwrite operations.
+ * Specific for each Metastore component.
+ *
+ * @param units Metastore component units
+ * @return list of overwrite operations
+ */
+ public abstract List<Overwrite> toOverwrite(List<T> units);
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OutputDataTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OutputDataTransformer.java
new file mode 100644
index 0000000..2681750
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OutputDataTransformer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.bson.Document;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class to convert list of {@link Document}
+ * into Metastore component units for the given list of column names.
+ *
+ * @param <T> Metastore component unit type
+ */
+public abstract class OutputDataTransformer<T> {
+ private final Map<String, MethodHandle> unitSetters;
+ private final List<String> columns = new ArrayList<>();
+ private final List<Document> documents = new ArrayList<>();
+
+ protected OutputDataTransformer(Map<String, MethodHandle> unitSetters) {
+ this.unitSetters = unitSetters;
+ }
+
+ public OutputDataTransformer<T> columns(List<String> columns) {
+ this.columns.addAll(columns);
+ return this;
+ }
+
+ public OutputDataTransformer<T> documents(List<Document> documents) {
+ this.documents.addAll(documents);
+ return this;
+ }
+
+ /**
+ * Converts given list of {@link Document} into Metastore component units.
+ * Specific for each Metastore component.
+ *
+ * @return list of Metastore component units
+ */
+ public abstract List<T> execute();
+
+ /**
+ * For each given record prepares specific methods handler and its value
+ * to be set into Metastore specific component unit.
+ * Ignores absent setters for columns and null values.
+ *
+ * @return list of methods handlers and values to set
+ */
+ protected List<Map<MethodHandle, Object>> valuesToSet() {
+ List<Map<MethodHandle, Object>> results = Lists.newLinkedList();
+ for (Document doc : documents) {
+ Map<MethodHandle, Object> handlerMap = Maps.newHashMap();
+ for (Map.Entry<String, Object> entry : doc.entrySet()) {
+ if (unitSetters.containsKey(entry.getKey())
+ && (columns.isEmpty() || columns.contains(entry.getKey()))) {
+ handlerMap.put(unitSetters.get(entry.getKey()), entry.getValue());
+ }
+ }
+ if (!handlerMap.isEmpty()) {
+ results.add(handlerMap);
+ }
+ }
+ return results;
+ }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/Transformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/Transformer.java
new file mode 100644
index 0000000..bd59371
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/Transformer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+/**
+ * Provides various mechanism implementations to transform filters, data and operations.
+ *
+ * @param <T> component unit type
+ */
+public interface Transformer<T> {
+
+ /**
+ * Creates filter transformer. Since filter transformer does not
+ * depend on specific Metastore component implementation, provides
+ * it as default method.
+ *
+ * @return filter transformer
+ */
+ default FilterTransformer filter() {
+ return new FilterTransformer();
+ }
+
+ /**
+ * @return input data transformer for specific Metastore component
+ */
+ InputDataTransformer<T> inputData();
+
+ /**
+ * @return output data transformer for specific Metastore component
+ */
+ OutputDataTransformer<T> outputData();
+
+ /**
+ * @return operation transformer for specific Metastore component
+ */
+ OperationTransformer<T> operation();
+}
diff --git a/metastore/mongo-metastore/src/main/resources/drill-metastore-module.conf b/metastore/mongo-metastore/src/main/resources/drill-metastore-module.conf
new file mode 100644
index 0000000..1b83410
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/resources/drill-metastore-module.conf
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+drill.metastore.mongo: {
+ connection: "mongodb://localhost:27017"
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/MongoBaseTest.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/MongoBaseTest.java
new file mode 100644
index 0000000..2a7f677
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/MongoBaseTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.MetastoreTest;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.components.tables.AbstractBasicTablesRequestsTest;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.Network;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Category(MetastoreTest.class)
+public class MongoBaseTest extends AbstractBasicTablesRequestsTest {
+ private static final Logger logger = LoggerFactory.getLogger(MongoBaseTest.class);
+
+ private static final String MONGO_IMAGE_NAME = "mongo:4.4.10";
+ private static final int MONGO_PORT = 27017;
+ private static final String CONFIG_SERVER_HOST = "m0";
+ private static final String CONFIG_REPL_SET = "conf";
+ private static final String SHARD_REPL_SET_0 = "shard0";
+ private static final String SHARD_REPL_SET_1 = "shard1";
+ private static final List<GenericContainer<?>> containers = Lists.newArrayList();
+ protected static boolean isShardMode = Boolean.parseBoolean(
+ System.getProperty("drill.mongo.tests.shardMode", "false"));
+
+ @BeforeClass
+ public static void init() throws IOException, InterruptedException {
+ String connectionString;
+ if (isShardMode) {
+ connectionString = initCluster();
+ } else {
+ connectionString = initSingle();
+ }
+ Config config = DrillConfig.create()
+ .withValue(MongoConfigConstants.CONNECTION,
+ ConfigValueFactory.fromAnyRef(connectionString));
+ innerInit(config, MongoMetastore.class);
+ }
+
+ private static String initSingle() {
+ MongoDBContainer container = new MongoDBContainer(MONGO_IMAGE_NAME);
+ container.start();
+ containers.add(container);
+ return String.format("mongodb://%s:%d",
+ container.getContainerIpAddress(), container.getFirstMappedPort());
+ }
+
+ private static String initCluster() throws IOException, InterruptedException {
+ Network network = Network.newNetwork();
+ initConfigServer(network);
+ initShardServers(network);
+ String connectionString = initMongos(network);
+ shardCollection();
+ return connectionString;
+ }
+
+ private static void initConfigServer(Network network) throws IOException,
+ InterruptedException {
+ GenericContainer<?> configServer =
+ newContainer(network, "configsvr", CONFIG_REPL_SET, CONFIG_SERVER_HOST);
+ configServer.start();
+
+ Container.ExecResult execResult = configServer.execInContainer("/bin/bash", "-c",
+ String.format("echo 'rs.initiate({_id: \"%s\", configsvr: true, " +
+ "members: [{ _id : 0, host : \"%s:%s\" }]})' | mongo --port %3$s", CONFIG_REPL_SET, CONFIG_SERVER_HOST, MONGO_PORT));
+ logger.debug(execResult.toString());
+ containers.add(configServer);
+ }
+
+ private static void initShardServers(Network network) throws IOException,
+ InterruptedException {
+ List<GenericContainer<?>> shards = Lists.newArrayList();
+
+ shards.addAll(Stream.of("m1", "m2", "m3")
+ .map(host -> newContainer(network, "shardsvr", SHARD_REPL_SET_0, host))
+ .collect(Collectors.toList()));
+ shards.addAll(Stream.of("m4", "m5", "m6")
+ .map(host -> newContainer(network, "shardsvr", SHARD_REPL_SET_1, host))
+ .collect(Collectors.toList()));
+ shards.forEach(GenericContainer::start);
+
+ Container.ExecResult execResult = shards.get(0).execInContainer("/bin/bash",
+ "-c", String.format("mongo --port %s --eval 'printjson(rs.initiate(" +
+ "{_id:\"%s\",members:[{_id:0,host:\"m1:%1$s\"},{_id:1,host:\"m2:%1$s\"}," +
+ "{_id:2,host:\"m3:%1$s\"}]}))' --quiet",
+ MONGO_PORT, SHARD_REPL_SET_0));
+ logger.debug(execResult.toString());
+ execResult = shards.get(0).execInContainer("/bin/bash", "-c",
+ String.format("until mongo --port %s --eval \"printjson(rs.isMaster())\" " +
+ "| grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done", MONGO_PORT));
+ logger.debug(execResult.toString());
+
+ execResult = shards.get(3).execInContainer("/bin/bash", "-c",
+ String.format("mongo --port %s --eval 'printjson(rs.initiate(" +
+ "{_id:\"%s\",members:[{_id:0,host:\"m4:%1$s\"},{_id:1,host:\"m5:%1$s\"}," +
+ "{_id:2,host:\"m6:%1$s\"}]}))' --quiet",
+ MONGO_PORT, SHARD_REPL_SET_1));
+ logger.debug(execResult.toString());
+ execResult = shards.get(3).execInContainer("/bin/bash", "-c",
+ String.format("until mongo --port %s --eval \"printjson(rs.isMaster())\" " +
+ "| grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done", MONGO_PORT));
+ logger.debug(execResult.toString());
+ containers.addAll(shards);
+ }
+
+ private static String initMongos(Network network) throws IOException,
+ InterruptedException {
+ String mongosHost = "m7";
+ MongoDBContainer mongos = new MongoDBContainer(MONGO_IMAGE_NAME)
+ .withNetwork(network)
+ .withNetworkAliases(mongosHost)
+ .withExposedPorts(MONGO_PORT)
+ .withCommand(String.format("mongos --configdb %s/%s:%s --bind_ip " +
+ "localhost,%s --port %3$s", CONFIG_REPL_SET, CONFIG_SERVER_HOST, MONGO_PORT,
+ mongosHost));
+ mongos.start();
+
+ Container.ExecResult execResult = mongos.execInContainer("/bin/bash", "-c",
+ String.format("echo 'sh.addShard(\"%s/m1,m2,m3\")' | mongo --port %s",
+ SHARD_REPL_SET_0, MONGO_PORT));
+ logger.debug(execResult.toString());
+ execResult = mongos.execInContainer("/bin/bash", "-c",
+ String.format("echo 'sh.addShard(\"%s/m4,m5,m6\")' | mongo --port %s",
+ SHARD_REPL_SET_1, MONGO_PORT));
+ logger.debug(execResult.toString());
+
+ logger.debug("Execute list shards.");
+ execResult = mongos.execInContainer("/bin/bash", "-c", "mongo --eval 'db" +
+ ".adminCommand({ listShards: 1 })' --port " + MONGO_PORT);
+ logger.debug(execResult.toString());
+ containers.add(mongos);
+
+ // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
+ return String.format("mongodb://%s:%s", mongos.getContainerIpAddress(), mongos.getMappedPort(MONGO_PORT));
+ }
+
+ private static void shardCollection() throws IOException, InterruptedException {
+ // Enabled sharding at database level
+ logger.debug("Enabled sharding for database: {}", MongoConfigConstants.DEFAULT_DATABASE);
+ Container.ExecResult execResult = containers.get(containers.size() - 1)
+ .execInContainer("/bin/bash", "-c",
+ String.format("mongo --eval 'db.adminCommand({enableSharding:\"%s\"})'",
+ MongoConfigConstants.DEFAULT_DATABASE));
+ logger.debug(execResult.toString());
+
+ // Shard the collection
+ logger.debug("Shard the collection: {}.{}", MongoConfigConstants.DEFAULT_DATABASE,
+ MongoConfigConstants.DEFAULT_TABLE_COLLECTION);
+ execResult = containers.get(containers.size() - 1)
+ .execInContainer("/bin/bash", "-c",
+ String.format("echo 'sh.shardCollection(\"%s.%s\", {\"%s\" : \"hashed\"})' " +
+ "| mongo ", MongoConfigConstants.DEFAULT_DATABASE,
+ MongoConfigConstants.DEFAULT_TABLE_COLLECTION, MongoConfigConstants.ID));
+ logger.debug(execResult.toString());
+ }
+
+ private static GenericContainer<?> newContainer(Network network, String type,
+ String replSet, String host) {
+ return new GenericContainer<>(MONGO_IMAGE_NAME)
+ .withNetwork(network)
+ .withNetworkAliases(host)
+ .withExposedPorts(MONGO_PORT)
+ .withCommand(String.format("mongod --port %d --%s --replSet %s " +
+ "--bind_ip localhost,%s", MONGO_PORT, type, replSet, host));
+ }
+
+ @AfterClass
+ public static void tearDownCluster() {
+ containers.forEach(GenericContainer::stop);
+ }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestMongoBasicTablesRequests.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestMongoBasicTablesRequests.java
new file mode 100644
index 0000000..91b4fe7
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestMongoBasicTablesRequests.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import com.clearspring.analytics.util.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.metastore.components.tables.BasicTablesTransformer;
+import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.mongo.MongoBaseTest;
+import org.apache.drill.metastore.operate.Delete;
+import org.apache.drill.metastore.operate.Metadata;
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.base.Joiner;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMongoBasicTablesRequests extends MongoBaseTest {
+
+ @Test
+ public void testMetastoreTableInfoExistingTable() {
+ MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+ assertTrue(metastoreTableInfo.isExists());
+ assertEquals(nationTableInfo, metastoreTableInfo.tableInfo());
+ assertEquals(nationTable.lastModifiedTime(), metastoreTableInfo.lastModifiedTime());
+ assertEquals(Metadata.UNDEFINED, metastoreTableInfo.metastoreVersion());
+ }
+
+ @Test
+ public void testDelete() {
+ MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+ assertTrue(metastoreTableInfo.isExists());
+ tables.modify()
+ .delete(Delete.builder()
+ .metadataType(MetadataType.TABLE)
+ .filter(nationTableInfo.toFilter())
+ .build())
+ .execute();
+ metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+ assertFalse(metastoreTableInfo.isExists());
+
+ List<TableMetadataUnit> res =
+ tables.read().metadataType(MetadataType.ALL).execute();
+ assertFalse(res.isEmpty());
+ tables.modify().purge();
+ res = tables.read().metadataType(MetadataType.ALL).execute();
+ assertTrue(res.isEmpty());
+
+ prepareData(tables);
+ }
+
+ @Test
+ public void testTableMetastoreSchemaParse() {
+ TableMetadataUnit unit = TableMetadataUnit.builder()
+ .storagePlugin("dfs")
+ .workspace("tmp")
+ .tableName("test")
+ .owner("user")
+ .tableType("json")
+ .metadataType(MetadataType.TABLE.name())
+ .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+ .location("/tmp/donuts.json")
+ .schema("{\"type\":\"tuple_schema\",\"columns\":[" +
+ "{\"name\":\"id\",\"type\":\"VARCHAR\",\"mode\":\"OPTIONAL\"}," +
+ "{\"name\":\"type\",\"type\":\"VARCHAR\",\"mode\":\"OPTIONAL\"}," +
+ "{\"name\":\"name\",\"type\":\"VARCHAR\",\"mode\":\"OPTIONAL\"}," +
+ "{\"name\":\"ppu\",\"type\":\"DOUBLE\",\"mode\":\"OPTIONAL\"}," +
+ "{\"name\":\"sales\",\"type\":\"BIGINT\",\"mode\":\"OPTIONAL\"}," +
+ "{\"name\":\"batters\",\"type\":\"STRUCT<`batter` ARRAY<STRUCT<`id` VARCHAR, `type` VARCHAR>>>\",\"mode\":\"REQUIRED\"}," +
+ "{\"name\":\"topping\",\"type\":\"ARRAY<STRUCT<`id` VARCHAR, `type` VARCHAR>>\",\"mode\":\"REPEATED\"}," +
+ "{\"name\":\"filling\",\"type\":\"ARRAY<STRUCT<`id` VARCHAR, `type` VARCHAR>>\",\"mode\":\"REPEATED\"}]}")
+ .lastModifiedTime(System.currentTimeMillis())
+ .columnsStatistics(Maps.newHashMap())
+ .metadataStatistics(Lists.newArrayList())
+ .partitionKeys(Maps.newHashMap())
+ .build();
+ List<SchemaPath> schemaPaths =
+ SchemaUtil.getSchemaPaths(BasicTablesTransformer.tables(Collections.singletonList(unit)).get(0).getSchema());
+ Set<String> schemaSet =
+ schemaPaths.stream().map(SchemaPath::toString).collect(Collectors.toSet());
+ assertEquals(String.format("Schema path is not parsed correctly:%s",
+ Joiner.on(",").join(schemaPaths)), schemaPaths.size(), schemaSet.size());
+ }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesInputDataTransformer.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesInputDataTransformer.java
new file mode 100644
index 0000000..e533b2d
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesInputDataTransformer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.transform.InputDataTransformer;
+import org.bson.Document;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTablesInputDataTransformer {
+ @Test
+ public void testNoData() {
+ List<Document> documents =
+ new InputDataTransformer<TableMetadataUnit>(TableMetadataUnit.SCHEMA.unitGetters())
+ .units(Collections.emptyList())
+ .execute();
+ assertEquals(Collections.emptyList(), documents);
+ }
+
+ @Test
+ public void testValidDataOneRecord() {
+ Map<String, String> partitionKeys = new HashMap<>();
+ partitionKeys.put("dir0", "2018");
+ partitionKeys.put("dir1", "2019");
+ List<String> partitionValues = Arrays.asList("a", "b", "c");
+ Long lastModifiedTime = System.currentTimeMillis();
+
+ TableMetadataUnit unit = TableMetadataUnit.builder()
+ .storagePlugin("dfs")
+ .workspace("tmp")
+ .tableName("nation")
+ .metadataType(MetadataType.TABLE.name())
+ .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+ .partitionKeys(partitionKeys)
+ .partitionValues(partitionValues)
+ .lastModifiedTime(lastModifiedTime)
+ .build();
+
+ InputDataTransformer<TableMetadataUnit> inputDataTransformer =
+ new InputDataTransformer<>(TableMetadataUnit.SCHEMA.unitGetters());
+ List<Document> documents = inputDataTransformer
+ .units(Collections.singletonList(unit))
+ .execute();
+
+ Document tableRecord = new Document();
+ tableRecord.append("storagePlugin", "dfs");
+ tableRecord.append("workspace", "tmp");
+ tableRecord.append("tableName", "nation");
+ tableRecord.append("metadataType", "TABLE");
+ tableRecord.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+ assertEquals(tableRecord, documents.get(0).get(MongoConfigConstants.ID));
+ assertEquals(partitionKeys, documents.get(0).get("partitionKeys"));
+ assertEquals(partitionValues, documents.get(0).get("partitionValues"));
+ assertEquals(lastModifiedTime, documents.get(0).get("lastModifiedTime"));
+ }
+
+ @Test
+ public void testValidDataSeveralRecords() {
+ List<TableMetadataUnit> units = Arrays.asList(
+ TableMetadataUnit.builder()
+ .storagePlugin("dfs")
+ .workspace("tmp")
+ .tableName("nation")
+ .metadataType(MetadataType.TABLE.name())
+ .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+ .column("a")
+ .build(),
+ TableMetadataUnit.builder()
+ .storagePlugin("dfs")
+ .workspace("tmp")
+ .tableName("nation")
+ .metadataType(MetadataType.TABLE.name())
+ .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+ .column("b")
+ .build(),
+ TableMetadataUnit.builder()
+ .storagePlugin("dfs")
+ .workspace("tmp")
+ .tableName("nation")
+ .metadataType(MetadataType.TABLE.name())
+ .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+ .column("c")
+ .build());
+
+ InputDataTransformer<TableMetadataUnit> inputDataTransformer =
+ new InputDataTransformer<>(TableMetadataUnit.SCHEMA.unitGetters());
+ List<Document> documents = inputDataTransformer
+ .units(units)
+ .execute();
+
+ Document tableRecord1 = new Document();
+ tableRecord1.append("storagePlugin", "dfs");
+ tableRecord1.append("workspace", "tmp");
+ tableRecord1.append("tableName", "nation");
+ tableRecord1.append("metadataType", "TABLE");
+ tableRecord1.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+
+ Document tableRecord2 = new Document();
+ tableRecord2.append("storagePlugin", "dfs");
+ tableRecord2.append("workspace", "tmp");
+ tableRecord2.append("tableName", "nation");
+ tableRecord2.append("metadataType", "TABLE");
+ tableRecord2.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+
+ Document tableRecord3 = new Document();
+ tableRecord3.append("storagePlugin", "dfs");
+ tableRecord3.append("workspace", "tmp");
+ tableRecord3.append("tableName", "nation");
+ tableRecord3.append("metadataType", "TABLE");
+ tableRecord3.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+
+ assertEquals(tableRecord1, documents.get(0).get(MongoConfigConstants.ID));
+ assertEquals(tableRecord2, documents.get(1).get(MongoConfigConstants.ID));
+ assertEquals(tableRecord3, documents.get(2).get(MongoConfigConstants.ID));
+ }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOperationTransformer.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOperationTransformer.java
new file mode 100644
index 0000000..cde42b4
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOperationTransformer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import com.mongodb.client.model.Filters;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.mongo.MongoMetastore;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.operate.MongoDelete;
+import org.apache.drill.metastore.mongo.operate.Overwrite;
+import org.apache.drill.metastore.mongo.transform.InputDataTransformer;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.apache.drill.metastore.operate.Delete;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTablesOperationTransformer {
+
+ private static OperationTransformer<TableMetadataUnit> transformer;
+ private static MongoMetastore metastore;
+
+ @BeforeClass
+ public static void init() {
+ DrillConfig drillConfig = new DrillConfig(DrillConfig.create().withValue(MongoConfigConstants.CONNECTION,
+ ConfigValueFactory.fromAnyRef("mongodb://localhost:27017/?connectTimeoutMS=60000&maxPoolSize=1000&safe=true")));
+ metastore = new MongoMetastore(drillConfig);
+ transformer = new TablesOperationTransformer(((MongoTables) metastore.tables()).context());
+ }
+
+ @Test
+ public void testToOverwriteOperation() {
+ TableMetadataUnit unit = TableMetadataUnit.builder()
+ .storagePlugin("dfs").workspace("tmp").tableName("nation")
+ .metadataType(MetadataType.TABLE.name()).metadataIdentifier("s1").build();
+ List<Overwrite> operations = transformer.toOverwrite(Collections.singletonList(unit));
+ InputDataTransformer<TableMetadataUnit> inputDataTransformer =
+ ((MongoTables) metastore.tables()).transformer().inputData();
+ Document expected = new Document();
+ expected.append("storagePlugin", "dfs");
+ expected.append("workspace", "tmp");
+ expected.append("tableName", "nation");
+ expected.append("metadataType", MetadataType.TABLE.name());
+ expected.append("metadataIdentifier", "s1");
+
+ assertEquals(new Document()
+ .append(MongoConfigConstants.ID, inputDataTransformer.createId(expected)),
+ operations.get(0).filter());
+ assertEquals(expected, operations.get(0).data().get(MongoConfigConstants.ID));
+ }
+
+ @Test
+ public void testToOverwriteOperations() {
+ List<TableMetadataUnit> units = Arrays.asList(
+ TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+ .tableName("nation").metadataType(MetadataType.ROW_GROUP.name())
+ .metadataIdentifier("s1/nation2.parquet/0").build(),
+ TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+ .tableName("nation").metadataType(MetadataType.ROW_GROUP.name())
+ .metadataIdentifier("s1/nation.parquet/0").build(),
+ TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+ .tableName("nation").metadataType(MetadataType.FILE.name())
+ .metadataIdentifier("s1/nation2.parquet").build(),
+ TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+ .tableName("nation").metadataType(MetadataType.FILE.name())
+ .metadataIdentifier("s1/nation.parquet").build(),
+ TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+ .tableName("region").metadataType(MetadataType.SEGMENT.name())
+ .metadataIdentifier("s1").build(),
+ TableMetadataUnit.builder().storagePlugin("s3").workspace("tmp")
+ .tableName("region").metadataType(MetadataType.TABLE.name())
+ .metadataIdentifier("GENERAL_INFO").build());
+
+ List<Overwrite> operations = transformer.toOverwrite(units);
+ assertEquals(6, operations.size());
+ }
+
+ @Test
+ public void testToDeleteOperation() {
+ Bson expected = Filters.and(
+ Filters.eq(MetastoreColumn.STORAGE_PLUGIN.columnName(), "dfs"),
+ Filters.eq(MetastoreColumn.WORKSPACE.columnName(), "tmp"));
+ FilterExpression filter = FilterExpression.and(
+ FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+ FilterExpression.equal(MetastoreColumn.WORKSPACE, "tmp"));
+
+ Delete delete = Delete.builder()
+ .metadataType(MetadataType.ALL)
+ .filter(filter)
+ .build();
+
+ MongoDelete operation = transformer.toDelete(delete);
+ assertEquals(expected.toString(), operation.filter().toString());
+ }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOutputDataTransformer.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOutputDataTransformer.java
new file mode 100644
index 0000000..52010ac
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOutputDataTransformer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.bson.Document;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTablesOutputDataTransformer {
+
+ private static Map<String, MethodHandle> unitSetters;
+
+ @BeforeClass
+ public static void init() {
+ unitSetters = TableMetadataUnit.SCHEMA.unitBuilderSetters();
+ }
+
+ @Test
+ public void testNoData() {
+ List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+ .columns(Arrays.asList("storagePlugin", "workspace", "tableName"))
+ .documents(Collections.emptyList())
+ .execute();
+
+ assertEquals(Collections.emptyList(), actualResult);
+ }
+
+ @Test
+ public void testValidDataOneRecord() {
+ Map<String, String> partitionKeys = new HashMap<>();
+ partitionKeys.put("dir0", "2018");
+ partitionKeys.put("dir1", "2019");
+ List<String> partitionValues = Arrays.asList("a", "b", "c");
+ Long lastModifiedTime = System.currentTimeMillis();
+
+ Document record = new Document();
+ record.append("storagePlugin", "dfs");
+ record.append("workspace", "tmp");
+ record.append("tableName", "nation");
+ record.append("partitionKeys", partitionKeys);
+ record.append("partitionValues", partitionValues);
+ record.append("lastModifiedTime", lastModifiedTime);
+
+ List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+ .columns(Arrays.asList("storagePlugin", "workspace", "tableName",
+ "partitionKeys", "partitionValues", "lastModifiedTime"))
+ .documents(Collections.singletonList(record))
+ .execute();
+
+ List<TableMetadataUnit> expectedResult = Collections.singletonList(TableMetadataUnit.builder()
+ .storagePlugin("dfs")
+ .workspace("tmp")
+ .tableName("nation")
+ .partitionKeys(partitionKeys)
+ .partitionValues(partitionValues)
+ .lastModifiedTime(lastModifiedTime)
+ .build());
+
+ assertEquals(expectedResult, actualResult);
+ }
+
+ @Test
+ public void testValidDataSeveralRecords() {
+ Document record1 = new Document();
+ record1.append("tableName", "a");
+
+ Document record2 = new Document();
+ record2.append("tableName", "b");
+
+ Document record3 = new Document();
+ record3.append("tableName", "c");
+
+
+ List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+ .columns(Collections.singletonList("tableName"))
+ .documents(Arrays.asList(record1, record2, record3))
+ .execute();
+
+ List<TableMetadataUnit> expectedResult = Arrays.asList(
+ TableMetadataUnit.builder().tableName("a").build(),
+ TableMetadataUnit.builder().tableName("b").build(),
+ TableMetadataUnit.builder().tableName("c").build());
+
+ assertEquals(expectedResult, actualResult);
+ }
+
+ @Test
+ public void testInvalidColumns() {
+ Document record = new Document();
+ record.append("tableName", "a");
+
+ List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+ .documents(Collections.singletonList(record))
+ .columns(Arrays.asList("a", "b"))
+ .execute();
+
+ assertEquals(Collections.emptyList(), actualResult);
+ }
+}
diff --git a/metastore/pom.xml b/metastore/pom.xml
index f962ec3..b5d977e 100644
--- a/metastore/pom.xml
+++ b/metastore/pom.xml
@@ -46,5 +46,6 @@
<module>metastore-api</module>
<module>iceberg-metastore</module>
<module>rdbms-metastore</module>
+ <module>mongo-metastore</module>
</modules>
</project>
diff --git a/pom.xml b/pom.xml
index ed7a0f8..f711ecb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,7 @@
<aircompressor.version>0.20</aircompressor.version>
<iceberg.version>0.12.1</iceberg.version>
<univocity-parsers.version>2.8.3</univocity-parsers.version>
+ <mongo.version>4.3.3</mongo.version>
<junit.args/>
</properties>