You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/12/16 14:03:24 UTC

[drill] branch master updated: DRILL-8015: Add MongoDB Metastore implementation (#2384)

This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new f8845fe  DRILL-8015: Add MongoDB Metastore implementation (#2384)
f8845fe is described below

commit f8845fe2b4cd44c2900bde92b356c47d1530374f
Author: feiteng <32...@qq.com>
AuthorDate: Mon Oct 25 22:38:46 2021 +0800

    DRILL-8015: Add MongoDB Metastore implementation (#2384)
---
 contrib/storage-mongo/pom.xml                      |   2 +-
 distribution/pom.xml                               |   5 +
 distribution/src/assemble/component.xml            |   1 +
 .../drill-metastore-override-example.conf          |   7 +
 .../org/apache/drill/exec/record/SchemaUtil.java   |   8 +-
 .../java/org/apache/drill/metastore/TestData.java  |   2 +-
 .../tables/AbstractBasicTablesRequestsTest.java    |   2 +-
 metastore/mongo-metastore/pom.xml                  |  71 ++++++++
 .../drill/metastore/mongo/MongoMetastore.java      |  71 ++++++++
 .../metastore/mongo/MongoMetastoreContext.java     |  54 ++++++
 .../mongo/components/tables/MongoTables.java       |  83 +++++++++
 .../tables/TablesOperationTransformer.java         |  58 ++++++
 .../tables/TablesOutputDataTransformer.java        |  57 ++++++
 .../mongo/components/tables/TablesTransformer.java |  54 ++++++
 .../mongo/config/MongoConfigConstants.java         |  64 +++++++
 .../mongo/exception/MongoMetastoreException.java   |  36 ++++
 .../drill/metastore/mongo/operate/MongoDelete.java |  49 +++++
 .../metastore/mongo/operate/MongoMetadata.java     |  32 ++++
 .../drill/metastore/mongo/operate/MongoModify.java |  85 +++++++++
 .../metastore/mongo/operate/MongoOperation.java    |  28 +++
 .../drill/metastore/mongo/operate/MongoRead.java   |  61 +++++++
 .../drill/metastore/mongo/operate/Overwrite.java   |  58 ++++++
 .../mongo/transform/FilterExpressionVisitor.java   | 109 +++++++++++
 .../mongo/transform/FilterTransformer.java         |  65 +++++++
 .../mongo/transform/InputDataTransformer.java      |  75 ++++++++
 .../mongo/transform/OperationTransformer.java      |  58 ++++++
 .../mongo/transform/OutputDataTransformer.java     |  85 +++++++++
 .../metastore/mongo/transform/Transformer.java     |  52 ++++++
 .../src/main/resources/drill-metastore-module.conf |  21 +++
 .../drill/metastore/mongo/MongoBaseTest.java       | 201 +++++++++++++++++++++
 .../tables/TestMongoBasicTablesRequests.java       | 110 +++++++++++
 .../tables/TestTablesInputDataTransformer.java     | 142 +++++++++++++++
 .../tables/TestTablesOperationTransformer.java     | 122 +++++++++++++
 .../tables/TestTablesOutputDataTransformer.java    | 124 +++++++++++++
 metastore/pom.xml                                  |   1 +
 pom.xml                                            |   1 +
 36 files changed, 2050 insertions(+), 4 deletions(-)

diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 93d7ad3..34df507 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -45,7 +45,7 @@
   <dependency>
     <groupId>org.mongodb</groupId>
     <artifactId>mongodb-driver-sync</artifactId>
-    <version>4.3.3</version>
+    <version>${mongo.version}</version>
   </dependency>
 
     <!-- Test dependency -->
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 7a2f52d..5fc7d18 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -97,6 +97,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-mongo-metastore</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <exclusions>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 4c6124f..7421aa3 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -36,6 +36,7 @@
         <include>org.apache.drill.metastore:drill-metastore-api:jar</include>
         <include>org.apache.drill.metastore:drill-iceberg-metastore:jar</include>
         <include>org.apache.drill.metastore:drill-rdbms-metastore:jar</include>
+        <include>org.apache.drill.metastore:drill-mongo-metastore:jar</include>
         <include>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar</include>
         <include>org.apache.drill.contrib.storage-hive:drill-hive-exec-shaded:jar</include>
         <include>org.apache.drill.contrib.data:tpch-sample-data:jar</include>
diff --git a/distribution/src/main/resources/drill-metastore-override-example.conf b/distribution/src/main/resources/drill-metastore-override-example.conf
index d2742a8..42e860c 100644
--- a/distribution/src/main/resources/drill-metastore-override-example.conf
+++ b/distribution/src/main/resources/drill-metastore-override-example.conf
@@ -21,6 +21,7 @@
 
 drill.metastore: {
   # For Drill Iceberg Metastore use: org.apache.drill.metastore.iceberg.IcebergMetastore
+  # For Drill Mongo Metastore use: org.apache.drill.metastore.mongo.MongoMetastore
   implementation.class: "org.apache.drill.metastore.rdbms.RdbmsMetastore",
 
   # If implementation class is RdbmsMetastore and no data source config is indicated,
@@ -61,4 +62,10 @@ drill.metastore: {
       # relative_path: ${drill.exec.zk.root}"/metastore/iceberg"
     }
   }
+
+  mongo: {
+    # connection: "mongodb://localhost:27017",
+    # database: "meta",
+    # table_collection: "tables"
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 0a8620f..c68170f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -214,7 +214,13 @@ public class SchemaUtil {
         currentNames.add(columnMetadata.name());
         result.addAll(getColumnPaths(columnMetadata.tupleSchema(), currentNames));
       } else {
-        result.add(Collections.singletonList(columnMetadata.name()));
+        if (parentNames != null) {
+          List<String> combinedList = new ArrayList<>(parentNames);
+          combinedList.add(columnMetadata.name());
+          result.add(combinedList);
+        } else {
+          result.add(Collections.singletonList(columnMetadata.name()));
+        }
       }
     }
     return result;
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
index a638d15..dbc9e56 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
@@ -55,7 +55,7 @@ public class TestData {
       .lastModifiedTime(System.currentTimeMillis())
       .partitionKeys(Collections.singletonMap("dir0", "2018"))
       .additionalMetadata("additional test metadata")
-      .metadataIdentifier("part_int=3/part_varchar=g/0_0_0.parquet")
+      .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
       .column("`id`")
       .locations(Arrays.asList("/tmp/nation/1", "/tmp/nation/2"))
       .partitionValues(Arrays.asList("1", "2"))
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
index 1c6cd4e..aebbc8a 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
@@ -469,7 +469,7 @@ public abstract class AbstractBasicTablesRequestsTest extends BaseTest {
    *
    * @param tables Drill Metastore Tables instance
    */
-  private static void prepareData(Tables tables) {
+  protected static void prepareData(Tables tables) {
     TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
 
     nationTable = BaseTableMetadata.builder()
diff --git a/metastore/mongo-metastore/pom.xml b/metastore/mongo-metastore/pom.xml
new file mode 100644
index 0000000..33eeef4
--- /dev/null
+++ b/metastore/mongo-metastore/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>metastore-parent</artifactId>
+    <groupId>org.apache.drill.metastore</groupId>
+    <version>1.20.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>drill-mongo-metastore</artifactId>
+  <name>Drill : Metastore : Mongo</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-metastore-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongodb-driver-sync</artifactId>
+      <version>${mongo.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-metastore-api</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>mongodb</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastore.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastore.java
new file mode 100644
index 0000000..955ac85
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastore.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.components.views.Views;
+import org.apache.drill.metastore.mongo.components.tables.MongoTables;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+
+/**
+ * Mongo Drill Metastore implementation.
+ */
+public class MongoMetastore implements Metastore {
+
+  private final MongoClient client;
+  private final String database;
+  private final String tableCollection;
+
+  public MongoMetastore(DrillConfig config) {
+    this.client = MongoClients.create(
+      new ConnectionString(config.getString(MongoConfigConstants.CONNECTION)));
+    if (config.hasPath(MongoConfigConstants.DATABASE)) {
+      this.database = config.getString(MongoConfigConstants.DATABASE);
+    } else {
+      this.database = MongoConfigConstants.DEFAULT_DATABASE;
+    }
+    if (config.hasPath(MongoConfigConstants.TABLE_COLLECTION)) {
+      this.tableCollection = config.getString(MongoConfigConstants.TABLE_COLLECTION);
+    } else {
+      this.tableCollection = MongoConfigConstants.DEFAULT_TABLE_COLLECTION;
+    }
+  }
+
+  @Override
+  public Tables tables() {
+    return new MongoTables(
+      client.getDatabase(database).getCollection(tableCollection), client);
+  }
+
+  @Override
+  public Views views() {
+    throw new UnsupportedOperationException("Views metadata support is not implemented");
+  }
+
+  @Override
+  public void close() {
+    if (this.client != null) {
+      this.client.close();
+    }
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastoreContext.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastoreContext.java
new file mode 100644
index 0000000..fced238
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/MongoMetastoreContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import org.apache.drill.metastore.mongo.transform.Transformer;
+import org.bson.Document;
+
+/**
+ * Provides Mongo Metastore component tools to transform, read or write data from / into Mongo collections.
+ *
+ * @param <T> Metastore component unit metadata type
+ */
+public interface MongoMetastoreContext<T> {
+
+  /**
+   * Returns Mongo collection implementation used as storage for Metastore
+   * component data.
+   *
+   * @return Mongo collection instance
+   */
+  MongoCollection<Document> table();
+
+  /**
+   * Returns transformer which allows various
+   * data, filters, operations transformation.
+   *
+   * @return transformer instance
+   */
+  Transformer<T> transformer();
+
+  /**
+   * Returns Mongo client implementation
+   *
+   * @return Mongo client instance
+   */
+  MongoClient client();
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/MongoTables.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/MongoTables.java
new file mode 100644
index 0000000..4f9726f
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/MongoTables.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.components.tables.TablesMetadataTypeValidator;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.operate.MongoMetadata;
+import org.apache.drill.metastore.mongo.operate.MongoModify;
+import org.apache.drill.metastore.mongo.operate.MongoRead;
+import org.apache.drill.metastore.mongo.transform.Transformer;
+import org.apache.drill.metastore.operate.Metadata;
+import org.apache.drill.metastore.operate.Modify;
+import org.apache.drill.metastore.operate.Read;
+import org.bson.Document;
+
+
+/**
+ * Metastore Tables component which stores tables metadata in mongo collection
+ * Provides methods to read and modify tables metadata.
+ */
+public class MongoTables implements Tables, MongoMetastoreContext<TableMetadataUnit> {
+
+  private final MongoClient client;
+  private final MongoCollection<Document> tableCollection;
+
+  public MongoTables(MongoCollection<Document> tableCollection, MongoClient client) {
+    this.tableCollection = tableCollection;
+    this.client = client;
+  }
+
+  public MongoMetastoreContext<TableMetadataUnit> context() {
+    return this;
+  }
+
+  @Override
+  public Metadata metadata() {
+    return new MongoMetadata();
+  }
+
+  @Override
+  public Read<TableMetadataUnit> read() {
+    return new MongoRead<>(TablesMetadataTypeValidator.INSTANCE, context());
+  }
+
+  @Override
+  public Modify<TableMetadataUnit> modify() {
+    return new MongoModify<>(TablesMetadataTypeValidator.INSTANCE, context());
+  }
+
+  @Override
+  public MongoCollection<Document> table() {
+    return tableCollection;
+  }
+
+  @Override
+  public MongoClient client() {
+    return client;
+  }
+
+  @Override
+  public Transformer<TableMetadataUnit> transformer() {
+    return new TablesTransformer(context());
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOperationTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOperationTransformer.java
new file mode 100644
index 0000000..c149e30
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOperationTransformer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.operate.Overwrite;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.bson.Document;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Metastore Tables component operations transformer that provides mechanism
+ * to convert {@link TableMetadataUnit} data to Metastore overwrite / delete operations.
+ */
+public class TablesOperationTransformer extends OperationTransformer<TableMetadataUnit> {
+
+  public TablesOperationTransformer(MongoMetastoreContext<TableMetadataUnit> context) {
+    super(context);
+  }
+
+  /**
+   * Groups given list of {@link TableMetadataUnit}, convert them to list of overwrite operations
+   *
+   * @param units Metastore component units
+   * @return list of overwrite operations
+   */
+  @Override
+  public List<Overwrite> toOverwrite(List<TableMetadataUnit> units) {
+    return context.transformer().inputData()
+      .units(units)
+      .execute()
+      .stream()
+      .map(document ->
+        new Overwrite(document,
+          new Document()
+            .append(MongoConfigConstants.ID, document.get(MongoConfigConstants.ID))))
+      .collect(Collectors.toList());
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOutputDataTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOutputDataTransformer.java
new file mode 100644
index 0000000..d437ac9
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesOutputDataTransformer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.apache.drill.metastore.mongo.transform.OutputDataTransformer;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Metastore Tables component output data transformer that transforms
+ * {@link org.bson.Document} into {@link TableMetadataUnit}.
+ */
+public class TablesOutputDataTransformer extends OutputDataTransformer<TableMetadataUnit> {
+
+  public TablesOutputDataTransformer(Map<String, MethodHandle> unitSetters) {
+    super(unitSetters);
+  }
+
+  @Override
+  public List<TableMetadataUnit> execute() {
+    List<TableMetadataUnit> results = new ArrayList<>();
+    for (Map<MethodHandle, Object> valueToSet : valuesToSet()) {
+      TableMetadataUnit.Builder builder = TableMetadataUnit.builder();
+      for (Map.Entry<MethodHandle, Object> entry : valueToSet.entrySet()) {
+        try {
+          entry.getKey().invokeWithArguments(builder, entry.getValue());
+        } catch (Throwable e) {
+          throw new MongoMetastoreException(
+            String.format("Unable to invoke setter for [%s] using [%s]",
+              TableMetadataUnit.Builder.class.getSimpleName(), entry.getKey()), e);
+        }
+      }
+      results.add(builder.build());
+    }
+    return results;
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesTransformer.java
new file mode 100644
index 0000000..f931a8b
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/components/tables/TablesTransformer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.transform.InputDataTransformer;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.apache.drill.metastore.mongo.transform.OutputDataTransformer;
+import org.apache.drill.metastore.mongo.transform.Transformer;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+
+/**
+ * Metastore Tables component filter, data and operations transformer.
+ * Provides needed transformations when reading / writing {@link TableMetadataUnit}
+ * from / into Mongo collection.
+ */
+public class TablesTransformer implements Transformer<TableMetadataUnit> {
+
+  private final MongoMetastoreContext<TableMetadataUnit> context;
+
+  public TablesTransformer(MongoMetastoreContext<TableMetadataUnit> context) {
+    this.context = context;
+  }
+
+  @Override
+  public InputDataTransformer<TableMetadataUnit> inputData() {
+    return new InputDataTransformer<>(TableMetadataUnit.SCHEMA.unitGetters());
+  }
+
+  @Override
+  public OutputDataTransformer<TableMetadataUnit> outputData() {
+    return new TablesOutputDataTransformer(TableMetadataUnit.SCHEMA.unitBuilderSetters());
+  }
+
+  @Override
+  public OperationTransformer<TableMetadataUnit> operation() {
+    return new TablesOperationTransformer(context);
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/config/MongoConfigConstants.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/config/MongoConfigConstants.java
new file mode 100644
index 0000000..8c60a33
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/config/MongoConfigConstants.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.config;
+
+import org.apache.drill.metastore.config.MetastoreConfigConstants;
+
+/**
+ * Drill Mongo Metastore configuration which is defined
+ * in {@link MetastoreConfigConstants#MODULE_RESOURCE_FILE_NAME} file.
+ */
+public interface MongoConfigConstants {
+
+  /**
+   * Drill Mongo Metastore configuration properties namespace.
+   */
+  String BASE = MetastoreConfigConstants.BASE + "mongo.";
+
+  /**
+   * Mongo Metastore data source url property. Required.
+   */
+  String CONNECTION = BASE + "connection";
+
+  /**
+   * Database to store meta data. Optional, default is
+   * {@link MongoConfigConstants#DEFAULT_DATABASE}
+   */
+  String DATABASE = BASE + "database";
+
+  /**
+   * Collection to store meta data for tables. Optional, default is
+   * {@link MongoConfigConstants#DEFAULT_TABLE_COLLECTION}
+   */
+  String TABLE_COLLECTION = BASE + "table_collection";
+
+  /**
+   * Default database to store meta data.
+   */
+  String DEFAULT_DATABASE = "meta";
+
+  /**
+   * Default collection to store meta data for tables.
+   */
+  String DEFAULT_TABLE_COLLECTION = "tables";
+
+  /**
+   * Field name used to identify one document uniquely.
+   */
+  String ID = "_id";
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/exception/MongoMetastoreException.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/exception/MongoMetastoreException.java
new file mode 100644
index 0000000..c548aff
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/exception/MongoMetastoreException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.exception;
+
+import org.apache.drill.metastore.exceptions.MetastoreException;
+
+/**
+ * Specific Mongo Drill Metastore runtime exception to indicate exceptions thrown
+ * during Mongo Drill Metastore code execution.
+ */
+public class MongoMetastoreException extends MetastoreException {
+  private static final long serialVersionUID = 0L;
+
+  public MongoMetastoreException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public MongoMetastoreException(String message) {
+    super(message);
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoDelete.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoDelete.java
new file mode 100644
index 0000000..3e475d7
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoDelete.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Mongo delete operation: deletes documents based on given row filter.
+ */
+public class MongoDelete implements MongoOperation {
+
+  private final Bson filter;
+
+  public MongoDelete(Bson filter) {
+    this.filter = filter;
+  }
+
+  public Bson filter() {
+    return filter;
+  }
+
+  @Override
+  public void execute(MongoCollection<Document> collection) {
+    try {
+      collection.deleteMany(filter);
+    } catch (Exception e) {
+      throw new MongoMetastoreException(String.format("failed to delete by %s" +
+        " from %s", filter.toString(), collection.getNamespace()), e);
+    }
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoMetadata.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoMetadata.java
new file mode 100644
index 0000000..5e54cc9
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoMetadata.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import org.apache.drill.metastore.operate.Metadata;
+
+/**
+ * Implementation of {@link Metadata} interface.
+ * Indicates that Mongo Metastore does not support versioning.
+ */
+public class MongoMetadata implements Metadata {
+
+  @Override
+  public boolean supportsVersioning() {
+    return false;
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoModify.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoModify.java
new file mode 100644
index 0000000..9a5b174
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoModify.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.ClientSession;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.apache.drill.metastore.operate.AbstractModify;
+import org.apache.drill.metastore.operate.Delete;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Modify;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of {@link Modify} interface based on {@link AbstractModify} parent class.
+ * Modifies information in Mongo collection based on given overwrite or delete operations.
+ * Executes given operations in one transaction.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class MongoModify<T> extends AbstractModify<T> {
+  private static final Logger logger = LoggerFactory.getLogger(MongoModify.class);
+
+  private final OperationTransformer<T> transformer;
+  private final MongoMetastoreContext<T> context;
+  private final List<MongoOperation> operations = new ArrayList<>();
+
+  public MongoModify(MetadataTypeValidator metadataTypeValidator, MongoMetastoreContext<T> context) {
+    super(metadataTypeValidator);
+    this.context = context;
+    this.transformer = context.transformer().operation();
+  }
+
+  @Override
+  public void execute() {
+    if (operations.isEmpty()) {
+      return;
+    }
+    executeOperations(operations);
+  }
+
+  @Override
+  public void purge() {
+    executeOperations(Collections.singletonList(transformer.toDeleteAll()));
+  }
+
+  @Override
+  protected void addOverwrite(List<T> units) {
+    operations.addAll(transformer.toOverwrite(units));
+  }
+
+  @Override
+  protected void addDelete(Delete delete) {
+    operations.add(transformer.toDelete(delete));
+  }
+
+  private void executeOperations(List<MongoOperation> operations) {
+    ClientSession clientSession = context.client().startSession();
+    String res = clientSession.withTransaction(() -> {
+      operations.forEach(o -> o.execute(context.table()));
+      return String.format("executed %s operations", operations.size());
+    });
+    logger.debug(res);
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoOperation.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoOperation.java
new file mode 100644
index 0000000..4e14b39
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoOperation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.MongoCollection;
+import org.bson.Document;
+
+/**
+ * Mongo operation
+ */
+public interface MongoOperation {
+  void execute(MongoCollection<Document> collection);
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoRead.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoRead.java
new file mode 100644
index 0000000..a1d7d42
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/MongoRead.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.transform.FilterTransformer;
+import org.apache.drill.metastore.operate.AbstractRead;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Read;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link Read} interface based on {@link AbstractRead} parent class.
+ * Reads information from Mongo collection based on given filter expression.
+ * Supports reading information for specific columns.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class MongoRead<T> extends AbstractRead<T> {
+
+  private final MongoMetastoreContext<T> context;
+
+  public MongoRead(MetadataTypeValidator metadataTypeValidator, MongoMetastoreContext<T> context) {
+    super(metadataTypeValidator);
+    this.context = context;
+  }
+
+  @Override
+  protected List<T> internalExecute() {
+    FilterTransformer filterTransformer = context.transformer().filter();
+    Bson rowFilter = filterTransformer.combine(
+      filterTransformer.transform(metadataTypes), filterTransformer.transform(filter));
+    List<Document> documents = Lists.newLinkedList();
+    context.table().find(rowFilter).forEach(documents::add);
+    return context.transformer().outputData()
+      .columns(columns.stream().map(MetastoreColumn::columnName).collect(Collectors.toList()))
+      .documents(documents)
+      .execute();
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/Overwrite.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/Overwrite.java
new file mode 100644
index 0000000..dedf851
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/operate/Overwrite.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.operate;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.ReplaceOptions;
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Mongo overwrite operation: overwrites data with given document based on given row filter.
+ */
+public class Overwrite implements MongoOperation {
+
+  private final Bson filter;
+  private final Document data;
+
+  public Overwrite(Document data, Bson filter) {
+    this.data = data;
+    this.filter = filter;
+  }
+
+  public Bson filter() {
+    return filter;
+  }
+
+  public Document data() {
+    return data;
+  }
+
+  @Override
+  public void execute(MongoCollection<Document> collection) {
+    ReplaceOptions replaceOptions = new ReplaceOptions().upsert(true);
+    try {
+      collection.replaceOne(filter, data, replaceOptions);
+    } catch (Exception e) {
+      throw new MongoMetastoreException(
+        String.format("failed to overwrite document by %s into %s",
+          filter.toString(), collection.getNamespace()), e);
+    }
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterExpressionVisitor.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterExpressionVisitor.java
new file mode 100644
index 0000000..eb3b28d
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterExpressionVisitor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import com.mongodb.client.model.Filters;
+import org.apache.drill.metastore.expressions.DoubleExpressionPredicate;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.expressions.IsPredicate;
+import org.apache.drill.metastore.expressions.ListPredicate;
+import org.apache.drill.metastore.expressions.SimplePredicate;
+import org.apache.drill.metastore.expressions.SingleExpressionPredicate;
+import org.bson.conversions.Bson;
+
+/**
+ * Visits {@link FilterExpression} implementations and transforms them into {@link Bson} implementations.
+ */
+public class FilterExpressionVisitor implements FilterExpression.Visitor<Bson> {
+
+  private static final FilterExpressionVisitor INSTANCE = new FilterExpressionVisitor();
+
+  public static FilterExpression.Visitor<Bson> get() {
+    return INSTANCE;
+  }
+
+  @Override
+  public Bson visit(SimplePredicate.Equal<?> expression) {
+    return Filters.eq(expression.column().columnName(), expression.value());
+  }
+
+  @Override
+  public Bson visit(SimplePredicate.NotEqual<?> expression) {
+    return Filters.ne(expression.column().columnName(), expression.value());
+  }
+
+  @Override
+  public Bson visit(SimplePredicate.LessThan<?> expression) {
+    return Filters.lt(expression.column().columnName(), expression.value());
+  }
+
+  @Override
+  public Bson visit(SimplePredicate.LessThanOrEqual<?> expression) {
+    return Filters.lte(expression.column().columnName(), expression.value());
+  }
+
+  @Override
+  public Bson visit(SimplePredicate.GreaterThan<?> expression) {
+    return Filters.gt(expression.column().columnName(), expression.value());
+  }
+
+  @Override
+  public Bson visit(SimplePredicate.GreaterThanOrEqual<?> expression) {
+    return Filters.gte(expression.column().columnName(), expression.value());
+  }
+
+  @Override
+  public Bson visit(ListPredicate.In<?> expression) {
+    return Filters.in(expression.column().columnName(), expression.values());
+  }
+
+  @Override
+  public Bson visit(ListPredicate.NotIn<?> expression) {
+    return Filters.nin(expression.column().columnName(), expression.values());
+  }
+
+  @Override
+  public Bson visit(IsPredicate.IsNull expression) {
+    return Filters.exists(expression.column().columnName(), false);
+  }
+
+  @Override
+  public Bson visit(IsPredicate.IsNotNull expression) {
+    return Filters.exists(expression.column().columnName());
+  }
+
+  @Override
+  public Bson visit(SingleExpressionPredicate.Not expression) {
+    Bson child = expression.expression().accept(this);
+    return Filters.not(child);
+  }
+
+  @Override
+  public Bson visit(DoubleExpressionPredicate.And expression) {
+    Bson right = expression.right().accept(this);
+    Bson left = expression.left().accept(this);
+    return Filters.and(right, left);
+  }
+
+  @Override
+  public Bson visit(DoubleExpressionPredicate.Or expression) {
+    Bson right = expression.right().accept(this);
+    Bson left = expression.left().accept(this);
+    return Filters.or(right, left);
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterTransformer.java
new file mode 100644
index 0000000..abd1d4a
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/FilterTransformer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import com.mongodb.client.model.Filters;
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Transforms given input into Mongo {@link Document} which is used as filter
+ * to retrieve, overwrite or delete Metastore component data.
+ */
+public class FilterTransformer {
+
+  public Bson transform(FilterExpression filter) {
+    return filter == null ? new Document() : filter.accept(FilterExpressionVisitor.get());
+  }
+
+  public Bson transform(Set<MetadataType> metadataTypes) {
+    if (metadataTypes.contains(MetadataType.ALL)) {
+      return new BsonDocument();
+    }
+
+    Set<String> inConditionValues = metadataTypes.stream()
+      .map(Enum::name)
+      .collect(Collectors.toSet());
+
+    if (inConditionValues.size() == 1) {
+      return Filters.eq(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues.iterator().next());
+    }
+    return Filters.in(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues);
+  }
+
+  public Bson combine(Bson... expressions) {
+    if (expressions.length == 0) {
+      return new BsonDocument();
+    }
+    if (expressions.length == 1) {
+      return expressions[0];
+    }
+    return Filters.and(expressions[0], expressions[1]);
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/InputDataTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/InputDataTransformer.java
new file mode 100644
index 0000000..0238952
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/InputDataTransformer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import com.google.gson.Gson;
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.exception.MongoMetastoreException;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.bson.Document;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Converts list of Metastore component units into {@link Document}.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class InputDataTransformer<T> {
+  private static final Gson GSON = new Gson();
+  private final List<T> units = new ArrayList<>();
+  private final Map<String, MethodHandle> unitGetters;
+
+  public InputDataTransformer(Map<String, MethodHandle> unitGetters) {
+    this.unitGetters = unitGetters;
+  }
+
+  public InputDataTransformer<T> units(List<T> units) {
+    this.units.addAll(units);
+    return this;
+  }
+
+  public Document createId(Document document) {
+    return Document.parse(GSON.toJson(ImmutableMap.of(
+      MetastoreColumn.STORAGE_PLUGIN.columnName(), document.get(MetastoreColumn.STORAGE_PLUGIN.columnName()),
+      MetastoreColumn.WORKSPACE.columnName(), document.get(MetastoreColumn.WORKSPACE.columnName()),
+      MetastoreColumn.TABLE_NAME.columnName(), document.get(MetastoreColumn.TABLE_NAME.columnName()),
+      MetastoreColumn.METADATA_TYPE.columnName(), document.get(MetastoreColumn.METADATA_TYPE.columnName()),
+      MetastoreColumn.METADATA_IDENTIFIER.columnName(), document.get(MetastoreColumn.METADATA_IDENTIFIER.columnName()))));
+  }
+
+  public List<Document> execute() {
+    return units.stream().map(unit -> {
+      Document document = new Document();
+      for (Map.Entry<String, MethodHandle> entry : unitGetters.entrySet()) {
+        try {
+          document.put(entry.getKey(), entry.getValue().invoke(unit));
+        } catch (Throwable e) {
+          throw new MongoMetastoreException(
+            String.format("Unable to invoke getter for column [%s] using [%s]", entry.getKey(), entry.getValue()), e);
+        }
+      }
+      return document.append(MongoConfigConstants.ID, createId(document));
+    }).collect(Collectors.toList());
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OperationTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OperationTransformer.java
new file mode 100644
index 0000000..405a379
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OperationTransformer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.mongo.MongoMetastoreContext;
+import org.apache.drill.metastore.mongo.operate.MongoDelete;
+import org.apache.drill.metastore.mongo.operate.MongoOperation;
+import org.apache.drill.metastore.mongo.operate.Overwrite;
+import org.apache.drill.metastore.operate.Delete;
+
+import java.util.List;
+
+/**
+ * Base class to transforms given input into {@link MongoOperation} implementations.
+ *
+ * @param <T> Metastore component unit type
+ */
+public abstract class OperationTransformer<T> {
+
+  protected final MongoMetastoreContext<T> context;
+
+  protected OperationTransformer(MongoMetastoreContext<T> context) {
+    this.context = context;
+  }
+
+  public MongoDelete toDeleteAll() {
+    return new MongoDelete(context.transformer().filter().transform((FilterExpression) null));
+  }
+
+  public MongoDelete toDelete(Delete delete) {
+    return new MongoDelete(context.transformer().filter().transform(delete.filter()));
+  }
+
+  /**
+   * Converts given list of Metastore components units into list of overwrite operations.
+   * Specific for each Metastore component.
+   *
+   * @param units Metastore component units
+   * @return list of overwrite operations
+   */
+  public abstract List<Overwrite> toOverwrite(List<T> units);
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OutputDataTransformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OutputDataTransformer.java
new file mode 100644
index 0000000..2681750
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/OutputDataTransformer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.bson.Document;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class to convert list of {@link Document}
+ * into Metastore component units for the given list of column names.
+ *
+ * @param <T> Metastore component unit type
+ */
+public abstract class OutputDataTransformer<T> {
+  private final Map<String, MethodHandle> unitSetters;
+  private final List<String> columns = new ArrayList<>();
+  private final List<Document> documents = new ArrayList<>();
+
+  protected OutputDataTransformer(Map<String, MethodHandle> unitSetters) {
+    this.unitSetters = unitSetters;
+  }
+
+  public OutputDataTransformer<T> columns(List<String> columns) {
+    this.columns.addAll(columns);
+    return this;
+  }
+
+  public OutputDataTransformer<T> documents(List<Document> documents) {
+    this.documents.addAll(documents);
+    return this;
+  }
+
+  /**
+   * Converts given list of {@link Document} into Metastore component units.
+   * Specific for each Metastore component.
+   *
+   * @return list of Metastore component units
+   */
+  public abstract List<T> execute();
+
+  /**
+   * For each given record prepares specific methods handler and its value
+   * to be set into Metastore specific component unit.
+   * Ignores absent setters for columns and null values.
+   *
+   * @return list of methods handlers and values to set
+   */
+  protected List<Map<MethodHandle, Object>> valuesToSet() {
+    List<Map<MethodHandle, Object>> results = Lists.newLinkedList();
+    for (Document doc : documents) {
+      Map<MethodHandle, Object> handlerMap = Maps.newHashMap();
+      for (Map.Entry<String, Object> entry : doc.entrySet()) {
+        if (unitSetters.containsKey(entry.getKey())
+          && (columns.isEmpty() || columns.contains(entry.getKey()))) {
+          handlerMap.put(unitSetters.get(entry.getKey()), entry.getValue());
+        }
+      }
+      if (!handlerMap.isEmpty()) {
+        results.add(handlerMap);
+      }
+    }
+    return results;
+  }
+}
diff --git a/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/Transformer.java b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/Transformer.java
new file mode 100644
index 0000000..bd59371
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/java/org/apache/drill/metastore/mongo/transform/Transformer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.transform;
+
+/**
+ * Provides various mechanism implementations to transform filters, data and operations.
+ *
+ * @param <T> component unit type
+ */
+public interface Transformer<T> {
+
+  /**
+   * Creates filter transformer. Since filter transformer does not
+   * depend on specific Metastore component implementation, provides
+   * it as default method.
+   *
+   * @return filter transformer
+   */
+  default FilterTransformer filter() {
+    return new FilterTransformer();
+  }
+
+  /**
+   * @return input data transformer for specific Metastore component
+   */
+  InputDataTransformer<T> inputData();
+
+  /**
+   * @return output data transformer for specific Metastore component
+   */
+  OutputDataTransformer<T> outputData();
+
+  /**
+   * @return operation transformer for specific Metastore component
+   */
+  OperationTransformer<T> operation();
+}
diff --git a/metastore/mongo-metastore/src/main/resources/drill-metastore-module.conf b/metastore/mongo-metastore/src/main/resources/drill-metastore-module.conf
new file mode 100644
index 0000000..1b83410
--- /dev/null
+++ b/metastore/mongo-metastore/src/main/resources/drill-metastore-module.conf
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+drill.metastore.mongo: {
+    connection: "mongodb://localhost:27017"
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/MongoBaseTest.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/MongoBaseTest.java
new file mode 100644
index 0000000..2a7f677
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/MongoBaseTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.MetastoreTest;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.components.tables.AbstractBasicTablesRequestsTest;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.Network;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Category(MetastoreTest.class)
+public class MongoBaseTest extends AbstractBasicTablesRequestsTest {
+  private static final Logger logger = LoggerFactory.getLogger(MongoBaseTest.class);
+
+  private static final String MONGO_IMAGE_NAME = "mongo:4.4.10";
+  private static final int MONGO_PORT = 27017;
+  private static final String CONFIG_SERVER_HOST = "m0";
+  private static final String CONFIG_REPL_SET = "conf";
+  private static final String SHARD_REPL_SET_0 = "shard0";
+  private static final String SHARD_REPL_SET_1 = "shard1";
+  private static final List<GenericContainer<?>> containers = Lists.newArrayList();
+  protected static boolean isShardMode = Boolean.parseBoolean(
+    System.getProperty("drill.mongo.tests.shardMode", "false"));
+
+  @BeforeClass
+  public static void init() throws IOException, InterruptedException {
+    String connectionString;
+    if (isShardMode) {
+      connectionString = initCluster();
+    } else {
+      connectionString = initSingle();
+    }
+    Config config = DrillConfig.create()
+      .withValue(MongoConfigConstants.CONNECTION,
+        ConfigValueFactory.fromAnyRef(connectionString));
+    innerInit(config, MongoMetastore.class);
+  }
+
+  private static String initSingle() {
+    MongoDBContainer container = new MongoDBContainer(MONGO_IMAGE_NAME);
+    container.start();
+    containers.add(container);
+    return String.format("mongodb://%s:%d",
+      container.getContainerIpAddress(), container.getFirstMappedPort());
+  }
+
+  private static String initCluster() throws IOException, InterruptedException {
+    Network network = Network.newNetwork();
+    initConfigServer(network);
+    initShardServers(network);
+    String connectionString = initMongos(network);
+    shardCollection();
+    return connectionString;
+  }
+
+  private static void initConfigServer(Network network) throws IOException,
+    InterruptedException {
+    GenericContainer<?> configServer =
+      newContainer(network, "configsvr", CONFIG_REPL_SET, CONFIG_SERVER_HOST);
+    configServer.start();
+
+    Container.ExecResult execResult = configServer.execInContainer("/bin/bash", "-c",
+      String.format("echo 'rs.initiate({_id: \"%s\", configsvr: true, " +
+        "members: [{ _id : 0, host : \"%s:%s\" }]})' | mongo --port %3$s", CONFIG_REPL_SET, CONFIG_SERVER_HOST, MONGO_PORT));
+    logger.debug(execResult.toString());
+    containers.add(configServer);
+  }
+
+  private static void initShardServers(Network network) throws IOException,
+    InterruptedException {
+    List<GenericContainer<?>> shards = Lists.newArrayList();
+
+    shards.addAll(Stream.of("m1", "m2", "m3")
+      .map(host -> newContainer(network, "shardsvr", SHARD_REPL_SET_0, host))
+      .collect(Collectors.toList()));
+    shards.addAll(Stream.of("m4", "m5", "m6")
+      .map(host -> newContainer(network, "shardsvr", SHARD_REPL_SET_1, host))
+      .collect(Collectors.toList()));
+    shards.forEach(GenericContainer::start);
+
+    Container.ExecResult execResult = shards.get(0).execInContainer("/bin/bash",
+      "-c", String.format("mongo --port %s --eval 'printjson(rs.initiate(" +
+          "{_id:\"%s\",members:[{_id:0,host:\"m1:%1$s\"},{_id:1,host:\"m2:%1$s\"}," +
+          "{_id:2,host:\"m3:%1$s\"}]}))' --quiet",
+      MONGO_PORT, SHARD_REPL_SET_0));
+    logger.debug(execResult.toString());
+    execResult = shards.get(0).execInContainer("/bin/bash", "-c",
+      String.format("until mongo --port %s --eval \"printjson(rs.isMaster())\" " +
+        "| grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done", MONGO_PORT));
+    logger.debug(execResult.toString());
+
+    execResult = shards.get(3).execInContainer("/bin/bash", "-c",
+      String.format("mongo --port %s --eval 'printjson(rs.initiate(" +
+          "{_id:\"%s\",members:[{_id:0,host:\"m4:%1$s\"},{_id:1,host:\"m5:%1$s\"}," +
+          "{_id:2,host:\"m6:%1$s\"}]}))' --quiet",
+        MONGO_PORT, SHARD_REPL_SET_1));
+    logger.debug(execResult.toString());
+    execResult = shards.get(3).execInContainer("/bin/bash", "-c",
+      String.format("until mongo --port %s --eval \"printjson(rs.isMaster())\" " +
+        "| grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done", MONGO_PORT));
+    logger.debug(execResult.toString());
+    containers.addAll(shards);
+  }
+
+  private static String initMongos(Network network) throws IOException,
+    InterruptedException {
+    String mongosHost = "m7";
+    MongoDBContainer mongos = new MongoDBContainer(MONGO_IMAGE_NAME)
+      .withNetwork(network)
+      .withNetworkAliases(mongosHost)
+      .withExposedPorts(MONGO_PORT)
+      .withCommand(String.format("mongos --configdb %s/%s:%s --bind_ip " +
+          "localhost,%s --port %3$s", CONFIG_REPL_SET, CONFIG_SERVER_HOST, MONGO_PORT,
+        mongosHost));
+    mongos.start();
+
+    Container.ExecResult execResult = mongos.execInContainer("/bin/bash", "-c",
+      String.format("echo 'sh.addShard(\"%s/m1,m2,m3\")' | mongo --port %s",
+        SHARD_REPL_SET_0, MONGO_PORT));
+    logger.debug(execResult.toString());
+    execResult = mongos.execInContainer("/bin/bash", "-c",
+      String.format("echo 'sh.addShard(\"%s/m4,m5,m6\")' | mongo --port %s",
+        SHARD_REPL_SET_1, MONGO_PORT));
+    logger.debug(execResult.toString());
+
+    logger.debug("Execute list shards.");
+    execResult = mongos.execInContainer("/bin/bash", "-c", "mongo --eval 'db" +
+      ".adminCommand({ listShards: 1 })' --port " + MONGO_PORT);
+    logger.debug(execResult.toString());
+    containers.add(mongos);
+
+    // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
+    return String.format("mongodb://%s:%s", mongos.getContainerIpAddress(), mongos.getMappedPort(MONGO_PORT));
+  }
+
+  private static void shardCollection() throws IOException, InterruptedException {
+    // Enabled sharding at database level
+    logger.debug("Enabled sharding for database: {}", MongoConfigConstants.DEFAULT_DATABASE);
+    Container.ExecResult execResult = containers.get(containers.size() - 1)
+      .execInContainer("/bin/bash", "-c",
+        String.format("mongo --eval 'db.adminCommand({enableSharding:\"%s\"})'",
+          MongoConfigConstants.DEFAULT_DATABASE));
+    logger.debug(execResult.toString());
+
+    // Shard the collection
+    logger.debug("Shard the collection: {}.{}", MongoConfigConstants.DEFAULT_DATABASE,
+      MongoConfigConstants.DEFAULT_TABLE_COLLECTION);
+    execResult = containers.get(containers.size() - 1)
+      .execInContainer("/bin/bash", "-c",
+        String.format("echo 'sh.shardCollection(\"%s.%s\", {\"%s\" : \"hashed\"})' " +
+            "| mongo ", MongoConfigConstants.DEFAULT_DATABASE,
+          MongoConfigConstants.DEFAULT_TABLE_COLLECTION, MongoConfigConstants.ID));
+    logger.debug(execResult.toString());
+  }
+
+  private static GenericContainer<?> newContainer(Network network, String type,
+                                                  String replSet, String host) {
+    return new GenericContainer<>(MONGO_IMAGE_NAME)
+      .withNetwork(network)
+      .withNetworkAliases(host)
+      .withExposedPorts(MONGO_PORT)
+      .withCommand(String.format("mongod --port %d --%s --replSet %s " +
+        "--bind_ip localhost,%s", MONGO_PORT, type, replSet, host));
+  }
+
+  @AfterClass
+  public static void tearDownCluster() {
+    containers.forEach(GenericContainer::stop);
+  }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestMongoBasicTablesRequests.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestMongoBasicTablesRequests.java
new file mode 100644
index 0000000..91b4fe7
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestMongoBasicTablesRequests.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import com.clearspring.analytics.util.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.metastore.components.tables.BasicTablesTransformer;
+import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.mongo.MongoBaseTest;
+import org.apache.drill.metastore.operate.Delete;
+import org.apache.drill.metastore.operate.Metadata;
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.base.Joiner;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMongoBasicTablesRequests extends MongoBaseTest {
+
+  @Test
+  public void testMetastoreTableInfoExistingTable() {
+    MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+    assertTrue(metastoreTableInfo.isExists());
+    assertEquals(nationTableInfo, metastoreTableInfo.tableInfo());
+    assertEquals(nationTable.lastModifiedTime(), metastoreTableInfo.lastModifiedTime());
+    assertEquals(Metadata.UNDEFINED, metastoreTableInfo.metastoreVersion());
+  }
+
+  @Test
+  public void testDelete() {
+    MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+    assertTrue(metastoreTableInfo.isExists());
+    tables.modify()
+      .delete(Delete.builder()
+        .metadataType(MetadataType.TABLE)
+        .filter(nationTableInfo.toFilter())
+        .build())
+      .execute();
+    metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+    assertFalse(metastoreTableInfo.isExists());
+
+    List<TableMetadataUnit> res =
+      tables.read().metadataType(MetadataType.ALL).execute();
+    assertFalse(res.isEmpty());
+    tables.modify().purge();
+    res = tables.read().metadataType(MetadataType.ALL).execute();
+    assertTrue(res.isEmpty());
+
+    prepareData(tables);
+  }
+
+  @Test
+  public void testTableMetastoreSchemaParse() {
+    TableMetadataUnit unit = TableMetadataUnit.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .tableName("test")
+      .owner("user")
+      .tableType("json")
+      .metadataType(MetadataType.TABLE.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .location("/tmp/donuts.json")
+      .schema("{\"type\":\"tuple_schema\",\"columns\":[" +
+        "{\"name\":\"id\",\"type\":\"VARCHAR\",\"mode\":\"OPTIONAL\"}," +
+        "{\"name\":\"type\",\"type\":\"VARCHAR\",\"mode\":\"OPTIONAL\"}," +
+        "{\"name\":\"name\",\"type\":\"VARCHAR\",\"mode\":\"OPTIONAL\"}," +
+        "{\"name\":\"ppu\",\"type\":\"DOUBLE\",\"mode\":\"OPTIONAL\"}," +
+        "{\"name\":\"sales\",\"type\":\"BIGINT\",\"mode\":\"OPTIONAL\"}," +
+        "{\"name\":\"batters\",\"type\":\"STRUCT<`batter` ARRAY<STRUCT<`id` VARCHAR, `type` VARCHAR>>>\",\"mode\":\"REQUIRED\"}," +
+        "{\"name\":\"topping\",\"type\":\"ARRAY<STRUCT<`id` VARCHAR, `type` VARCHAR>>\",\"mode\":\"REPEATED\"}," +
+        "{\"name\":\"filling\",\"type\":\"ARRAY<STRUCT<`id` VARCHAR, `type` VARCHAR>>\",\"mode\":\"REPEATED\"}]}")
+      .lastModifiedTime(System.currentTimeMillis())
+      .columnsStatistics(Maps.newHashMap())
+      .metadataStatistics(Lists.newArrayList())
+      .partitionKeys(Maps.newHashMap())
+      .build();
+    List<SchemaPath> schemaPaths =
+      SchemaUtil.getSchemaPaths(BasicTablesTransformer.tables(Collections.singletonList(unit)).get(0).getSchema());
+    Set<String> schemaSet =
+      schemaPaths.stream().map(SchemaPath::toString).collect(Collectors.toSet());
+    assertEquals(String.format("Schema path is not parsed correctly:%s",
+      Joiner.on(",").join(schemaPaths)), schemaPaths.size(), schemaSet.size());
+  }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesInputDataTransformer.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesInputDataTransformer.java
new file mode 100644
index 0000000..e533b2d
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesInputDataTransformer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.transform.InputDataTransformer;
+import org.bson.Document;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTablesInputDataTransformer {
+  @Test
+  public void testNoData() {
+    List<Document> documents =
+      new InputDataTransformer<TableMetadataUnit>(TableMetadataUnit.SCHEMA.unitGetters())
+        .units(Collections.emptyList())
+        .execute();
+    assertEquals(Collections.emptyList(), documents);
+  }
+
+  @Test
+  public void testValidDataOneRecord() {
+    Map<String, String> partitionKeys = new HashMap<>();
+    partitionKeys.put("dir0", "2018");
+    partitionKeys.put("dir1", "2019");
+    List<String> partitionValues = Arrays.asList("a", "b", "c");
+    Long lastModifiedTime = System.currentTimeMillis();
+
+    TableMetadataUnit unit = TableMetadataUnit.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .tableName("nation")
+      .metadataType(MetadataType.TABLE.name())
+      .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+      .partitionKeys(partitionKeys)
+      .partitionValues(partitionValues)
+      .lastModifiedTime(lastModifiedTime)
+      .build();
+
+    InputDataTransformer<TableMetadataUnit> inputDataTransformer =
+      new InputDataTransformer<>(TableMetadataUnit.SCHEMA.unitGetters());
+    List<Document> documents = inputDataTransformer
+      .units(Collections.singletonList(unit))
+      .execute();
+
+    Document tableRecord = new Document();
+    tableRecord.append("storagePlugin", "dfs");
+    tableRecord.append("workspace", "tmp");
+    tableRecord.append("tableName", "nation");
+    tableRecord.append("metadataType", "TABLE");
+    tableRecord.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+    assertEquals(tableRecord, documents.get(0).get(MongoConfigConstants.ID));
+    assertEquals(partitionKeys, documents.get(0).get("partitionKeys"));
+    assertEquals(partitionValues, documents.get(0).get("partitionValues"));
+    assertEquals(lastModifiedTime, documents.get(0).get("lastModifiedTime"));
+  }
+
+  @Test
+  public void testValidDataSeveralRecords() {
+    List<TableMetadataUnit> units = Arrays.asList(
+      TableMetadataUnit.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataType(MetadataType.TABLE.name())
+        .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+        .column("a")
+        .build(),
+      TableMetadataUnit.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataType(MetadataType.TABLE.name())
+        .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+        .column("b")
+        .build(),
+      TableMetadataUnit.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataType(MetadataType.TABLE.name())
+        .metadataIdentifier(MetadataInfo.GENERAL_INFO_KEY)
+        .column("c")
+        .build());
+
+    InputDataTransformer<TableMetadataUnit> inputDataTransformer =
+      new InputDataTransformer<>(TableMetadataUnit.SCHEMA.unitGetters());
+    List<Document> documents = inputDataTransformer
+      .units(units)
+      .execute();
+
+    Document tableRecord1 = new Document();
+    tableRecord1.append("storagePlugin", "dfs");
+    tableRecord1.append("workspace", "tmp");
+    tableRecord1.append("tableName", "nation");
+    tableRecord1.append("metadataType", "TABLE");
+    tableRecord1.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+
+    Document tableRecord2 = new Document();
+    tableRecord2.append("storagePlugin", "dfs");
+    tableRecord2.append("workspace", "tmp");
+    tableRecord2.append("tableName", "nation");
+    tableRecord2.append("metadataType", "TABLE");
+    tableRecord2.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+
+    Document tableRecord3 = new Document();
+    tableRecord3.append("storagePlugin", "dfs");
+    tableRecord3.append("workspace", "tmp");
+    tableRecord3.append("tableName", "nation");
+    tableRecord3.append("metadataType", "TABLE");
+    tableRecord3.append("metadataIdentifier", MetadataInfo.GENERAL_INFO_KEY);
+
+    assertEquals(tableRecord1, documents.get(0).get(MongoConfigConstants.ID));
+    assertEquals(tableRecord2, documents.get(1).get(MongoConfigConstants.ID));
+    assertEquals(tableRecord3, documents.get(2).get(MongoConfigConstants.ID));
+  }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOperationTransformer.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOperationTransformer.java
new file mode 100644
index 0000000..cde42b4
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOperationTransformer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import com.mongodb.client.model.Filters;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.mongo.MongoMetastore;
+import org.apache.drill.metastore.mongo.config.MongoConfigConstants;
+import org.apache.drill.metastore.mongo.operate.MongoDelete;
+import org.apache.drill.metastore.mongo.operate.Overwrite;
+import org.apache.drill.metastore.mongo.transform.InputDataTransformer;
+import org.apache.drill.metastore.mongo.transform.OperationTransformer;
+import org.apache.drill.metastore.operate.Delete;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTablesOperationTransformer {
+
+  private static OperationTransformer<TableMetadataUnit> transformer;
+  private static MongoMetastore metastore;
+
+  @BeforeClass
+  public static void init() {
+    DrillConfig drillConfig = new DrillConfig(DrillConfig.create().withValue(MongoConfigConstants.CONNECTION,
+      ConfigValueFactory.fromAnyRef("mongodb://localhost:27017/?connectTimeoutMS=60000&maxPoolSize=1000&safe=true")));
+    metastore = new MongoMetastore(drillConfig);
+    transformer = new TablesOperationTransformer(((MongoTables) metastore.tables()).context());
+  }
+
+  @Test
+  public void testToOverwriteOperation() {
+    TableMetadataUnit unit = TableMetadataUnit.builder()
+      .storagePlugin("dfs").workspace("tmp").tableName("nation")
+      .metadataType(MetadataType.TABLE.name()).metadataIdentifier("s1").build();
+    List<Overwrite> operations = transformer.toOverwrite(Collections.singletonList(unit));
+    InputDataTransformer<TableMetadataUnit> inputDataTransformer =
+      ((MongoTables) metastore.tables()).transformer().inputData();
+    Document expected = new Document();
+    expected.append("storagePlugin", "dfs");
+    expected.append("workspace", "tmp");
+    expected.append("tableName", "nation");
+    expected.append("metadataType", MetadataType.TABLE.name());
+    expected.append("metadataIdentifier", "s1");
+
+    assertEquals(new Document()
+        .append(MongoConfigConstants.ID, inputDataTransformer.createId(expected)),
+      operations.get(0).filter());
+    assertEquals(expected, operations.get(0).data().get(MongoConfigConstants.ID));
+  }
+
+  @Test
+  public void testToOverwriteOperations() {
+    List<TableMetadataUnit> units = Arrays.asList(
+      TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+        .tableName("nation").metadataType(MetadataType.ROW_GROUP.name())
+        .metadataIdentifier("s1/nation2.parquet/0").build(),
+      TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+        .tableName("nation").metadataType(MetadataType.ROW_GROUP.name())
+        .metadataIdentifier("s1/nation.parquet/0").build(),
+      TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+        .tableName("nation").metadataType(MetadataType.FILE.name())
+        .metadataIdentifier("s1/nation2.parquet").build(),
+      TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+        .tableName("nation").metadataType(MetadataType.FILE.name())
+        .metadataIdentifier("s1/nation.parquet").build(),
+      TableMetadataUnit.builder().storagePlugin("dfs").workspace("tmp")
+        .tableName("region").metadataType(MetadataType.SEGMENT.name())
+        .metadataIdentifier("s1").build(),
+      TableMetadataUnit.builder().storagePlugin("s3").workspace("tmp")
+        .tableName("region").metadataType(MetadataType.TABLE.name())
+        .metadataIdentifier("GENERAL_INFO").build());
+
+    List<Overwrite> operations = transformer.toOverwrite(units);
+    assertEquals(6, operations.size());
+  }
+
+  @Test
+  public void testToDeleteOperation() {
+    Bson expected = Filters.and(
+      Filters.eq(MetastoreColumn.STORAGE_PLUGIN.columnName(), "dfs"),
+      Filters.eq(MetastoreColumn.WORKSPACE.columnName(), "tmp"));
+    FilterExpression filter = FilterExpression.and(
+      FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+      FilterExpression.equal(MetastoreColumn.WORKSPACE, "tmp"));
+
+    Delete delete = Delete.builder()
+      .metadataType(MetadataType.ALL)
+      .filter(filter)
+      .build();
+
+    MongoDelete operation = transformer.toDelete(delete);
+    assertEquals(expected.toString(), operation.filter().toString());
+  }
+}
diff --git a/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOutputDataTransformer.java b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOutputDataTransformer.java
new file mode 100644
index 0000000..52010ac
--- /dev/null
+++ b/metastore/mongo-metastore/src/test/java/org/apache/drill/metastore/mongo/components/tables/TestTablesOutputDataTransformer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.mongo.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.bson.Document;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTablesOutputDataTransformer {
+
+  private static Map<String, MethodHandle> unitSetters;
+
+  @BeforeClass
+  public static void init() {
+    unitSetters = TableMetadataUnit.SCHEMA.unitBuilderSetters();
+  }
+
+  @Test
+  public void testNoData() {
+    List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+      .columns(Arrays.asList("storagePlugin", "workspace", "tableName"))
+      .documents(Collections.emptyList())
+      .execute();
+
+    assertEquals(Collections.emptyList(), actualResult);
+  }
+
+  @Test
+  public void testValidDataOneRecord() {
+    Map<String, String> partitionKeys = new HashMap<>();
+    partitionKeys.put("dir0", "2018");
+    partitionKeys.put("dir1", "2019");
+    List<String> partitionValues = Arrays.asList("a", "b", "c");
+    Long lastModifiedTime = System.currentTimeMillis();
+
+    Document record = new Document();
+    record.append("storagePlugin", "dfs");
+    record.append("workspace", "tmp");
+    record.append("tableName", "nation");
+    record.append("partitionKeys", partitionKeys);
+    record.append("partitionValues", partitionValues);
+    record.append("lastModifiedTime", lastModifiedTime);
+
+    List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+      .columns(Arrays.asList("storagePlugin", "workspace", "tableName",
+        "partitionKeys", "partitionValues", "lastModifiedTime"))
+      .documents(Collections.singletonList(record))
+      .execute();
+
+    List<TableMetadataUnit> expectedResult = Collections.singletonList(TableMetadataUnit.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .tableName("nation")
+      .partitionKeys(partitionKeys)
+      .partitionValues(partitionValues)
+      .lastModifiedTime(lastModifiedTime)
+      .build());
+
+    assertEquals(expectedResult, actualResult);
+  }
+
+  @Test
+  public void testValidDataSeveralRecords() {
+    Document record1 = new Document();
+    record1.append("tableName", "a");
+
+    Document record2 = new Document();
+    record2.append("tableName", "b");
+
+    Document record3 = new Document();
+    record3.append("tableName", "c");
+
+
+    List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+      .columns(Collections.singletonList("tableName"))
+      .documents(Arrays.asList(record1, record2, record3))
+      .execute();
+
+    List<TableMetadataUnit> expectedResult = Arrays.asList(
+      TableMetadataUnit.builder().tableName("a").build(),
+      TableMetadataUnit.builder().tableName("b").build(),
+      TableMetadataUnit.builder().tableName("c").build());
+
+    assertEquals(expectedResult, actualResult);
+  }
+
+  @Test
+  public void testInvalidColumns() {
+    Document record = new Document();
+    record.append("tableName", "a");
+
+    List<TableMetadataUnit> actualResult = new TablesOutputDataTransformer(unitSetters)
+      .documents(Collections.singletonList(record))
+      .columns(Arrays.asList("a", "b"))
+      .execute();
+
+    assertEquals(Collections.emptyList(), actualResult);
+  }
+}
diff --git a/metastore/pom.xml b/metastore/pom.xml
index f962ec3..b5d977e 100644
--- a/metastore/pom.xml
+++ b/metastore/pom.xml
@@ -46,5 +46,6 @@
     <module>metastore-api</module>
     <module>iceberg-metastore</module>
     <module>rdbms-metastore</module>
+    <module>mongo-metastore</module>
   </modules>
 </project>
diff --git a/pom.xml b/pom.xml
index ed7a0f8..f711ecb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,7 @@
     <aircompressor.version>0.20</aircompressor.version>
     <iceberg.version>0.12.1</iceberg.version>
     <univocity-parsers.version>2.8.3</univocity-parsers.version>
+    <mongo.version>4.3.3</mongo.version>
     <junit.args/>
   </properties>