You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/28 00:31:54 UTC

[hudi] branch master updated: [HUDI-703] Add test for HoodieSyncCommand (#1774)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fa41921  [HUDI-703] Add test for HoodieSyncCommand (#1774)
fa41921 is described below

commit fa419213f62d2006aeea228180302024977feb16
Author: hongdd <jn...@163.com>
AuthorDate: Tue Jul 28 08:31:43 2020 +0800

    [HUDI-703] Add test for HoodieSyncCommand (#1774)
---
 docker/demo/config/hoodie-incr.properties          |  31 ++++
 docker/demo/config/hoodie-schema.avsc              | 145 ++++++++++++++++
 docker/demo/sync-validate.commands                 |  19 +++
 hudi-cli/pom.xml                                   |  17 ++
 .../hudi/cli/commands/HoodieSyncCommand.java       |   4 +-
 .../org/apache/hudi/integ/HoodieTestHiveBase.java  | 121 +++++++++++++
 .../java/org/apache/hudi/integ/ITTestBase.java     |   4 +-
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  |   6 +-
 .../integ/command/ITTestHoodieSyncCommand.java     |  75 ++++++++
 .../src/test/resources/hoodie-docker.properties    |  18 ++
 hudi-spark/run_hoodie_app.sh                       |   4 +-
 .../src/test/java/HoodieJavaGenerateApp.java       | 190 +++++++++++++++++++++
 12 files changed, 625 insertions(+), 9 deletions(-)

diff --git a/docker/demo/config/hoodie-incr.properties b/docker/demo/config/hoodie-incr.properties
new file mode 100644
index 0000000..95a6627
--- /dev/null
+++ b/docker/demo/config/hoodie-incr.properties
@@ -0,0 +1,31 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+hoodie.upsert.shuffle.parallelism=2
+hoodie.insert.shuffle.parallelism=2
+hoodie.bulkinsert.shuffle.parallelism=2
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.partitionpath.field=partition
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc
+hoodie.deltastreamer.source.hoodieincr.partition.fields=partition
+hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
+hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
+# hive sync
+hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
+hoodie.datasource.hive_sync.partition_fields=partition
\ No newline at end of file
diff --git a/docker/demo/config/hoodie-schema.avsc b/docker/demo/config/hoodie-schema.avsc
new file mode 100644
index 0000000..55e255f
--- /dev/null
+++ b/docker/demo/config/hoodie-schema.avsc
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+    "type": "record",
+    "name": "triprec",
+    "fields": [
+        {
+            "name": "timestamp",
+            "type": "double"
+        },
+        {
+            "name": "_row_key",
+            "type": "string"
+        },
+        {
+            "name": "rider",
+            "type": "string"
+        },
+        {
+            "name": "driver",
+            "type": "string"
+        },
+        {
+            "name": "begin_lat",
+            "type": "double"
+        },
+        {
+            "name": "begin_lon",
+            "type": "double"
+        },
+        {
+            "name": "end_lat",
+            "type": "double"
+        },
+        {
+            "name": "end_lon",
+            "type": "double"
+        },
+        {
+            "name": "distance_in_meters",
+            "type": "int"
+        },
+        {
+            "name": "seconds_since_epoch",
+            "type": "long"
+        },
+        {
+            "name": "weight",
+            "type": "float"
+        },
+        {
+            "name": "nation",
+            "type": "bytes"
+        },
+        {
+            "name": "current_date",
+            "type": {
+                "type": "int",
+                "logicalType": "date"
+            }
+        },
+        {
+            "name": "current_ts",
+            "type": {
+                "type": "long",
+                "logicalType": "timestamp-micros"
+            }
+        },
+        {
+            "name": "height",
+            "type": {
+                "type": "fixed",
+                "name": "abc",
+                "size": 5,
+                "logicalType": "decimal",
+                "precision": 10,
+                "scale": 6
+            }
+        },
+        {
+            "name": "city_to_state",
+            "type": {
+                "type": "map",
+                "values": "string"
+            }
+        },
+        {
+            "name": "fare",
+            "type": {
+                "type": "record",
+                "name": "fare",
+                "fields": [
+                    {
+                        "name": "amount",
+                        "type": "double"
+                    },
+                    {
+                        "name": "currency",
+                        "type": "string"
+                    }
+                ]
+            }
+        },
+        {
+            "name": "tip_history",
+            "type": {
+                "type": "array",
+                "items": {
+                    "type": "record",
+                    "name": "tip_history",
+                    "fields": [
+                        {
+                            "name": "amount",
+                            "type": "double"
+                        },
+                        {
+                            "name": "currency",
+                            "type": "string"
+                        }
+                    ]
+                }
+            }
+        },
+        {
+            "name": "_hoodie_is_deleted",
+            "type": "boolean",
+            "default": false
+        }
+    ]
+}
\ No newline at end of file
diff --git a/docker/demo/sync-validate.commands b/docker/demo/sync-validate.commands
new file mode 100644
index 0000000..32c334e
--- /dev/null
+++ b/docker/demo/sync-validate.commands
@@ -0,0 +1,19 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+connect --path /docker_hoodie_sync_valid_test
+commits sync --path /docker_hoodie_sync_valid_test_2
+sync validate --sourceDb default --targetDb default --hiveServerUrl jdbc:hive2://hiveserver:10000 --hiveUser hive --hivePass hive
\ No newline at end of file
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 13bc848..388e414 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -181,6 +181,23 @@
       <type>test-jar</type>
     </dependency>
 
+    <!-- hive -->
+    <dependency>
+      <groupId>${hive.groupid}</groupId>
+      <artifactId>hive-jdbc</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty.orbit</groupId>
+          <artifactId>javax.servlet</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-hadoop-bundle</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <!-- Logging -->
     <dependency>
       <groupId>log4j</groupId>
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
index e0ceb2e..66c2eb0 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
@@ -74,9 +74,9 @@ public class HoodieSyncCommand implements CommandMarker {
     }
 
     String targetLatestCommit =
-        targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp();
+        targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0";
     String sourceLatestCommit =
-        sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
+        sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0";
 
     if (sourceLatestCommit != null
         && HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
new file mode 100644
index 0000000..e2e9276
--- /dev/null
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ;
+
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class to run cmd and generate data in hive.
+ */
+public class HoodieTestHiveBase extends ITTestBase {
+
+  protected enum PartitionType {
+    SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
+  }
+
+  private final static int DEFAULT_TIME_WAIT = 5000;
+  private final static String OVERWRITE_COMMIT_TYPE = "overwrite";
+
+  /**
+   * A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it.
+   * Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
+   * spark-shell test-case
+   */
+  public void generateDataByHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType,
+      String commitType, String hoodieTableName) throws Exception {
+
+    String hdfsPath = getHDFSPath(hiveTableName);
+    String hdfsUrl = "hdfs://namenode" + hdfsPath;
+
+    Pair<String, String> stdOutErr;
+    if (OVERWRITE_COMMIT_TYPE.equals(commitType)) {
+      // Drop Table if it exists
+      try {
+        dropHiveTables(hiveTableName, tableType);
+      } catch (AssertionError ex) {
+        // In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
+        // Workaround to sleep for 5 secs and retry
+        // Set sleep time by hoodie.hiveserver.time.wait
+        Thread.sleep(getTimeWait());
+        dropHiveTables(hiveTableName, tableType);
+      }
+
+      // Ensure table does not exist
+      stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
+      if (!stdOutErr.getLeft().isEmpty()) {
+        throw new TableExistsException("Dropped table " + hiveTableName + " exists!");
+      }
+    }
+
+    // Run Hoodie Java App
+    String cmd = String.format("%s %s --hive-sync --table-path %s  --hive-url %s  --table-type %s  --hive-table %s" +
+        " --commit-type %s  --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
+        tableType, hiveTableName, commitType, hoodieTableName);
+    if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
+      cmd = cmd + " --use-multi-partition-keys";
+    } else if (partitionType == PartitionType.NON_PARTITIONED){
+      cmd = cmd + " --non-partitioned";
+    }
+    executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
+
+    String snapshotTableName = getSnapshotTableName(tableType, hiveTableName);
+
+    // Ensure table does exist
+    stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName + "'");
+    assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists");
+  }
+
+  protected void dropHiveTables(String hiveTableName, String tableType) throws Exception {
+    if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
+      executeHiveCommand("drop table if exists " + hiveTableName + "_ro");
+    } else {
+      executeHiveCommand("drop table if exists " + hiveTableName);
+    }
+  }
+
+  protected String getHDFSPath(String hiveTableName) {
+    return "/" + hiveTableName;
+  }
+
+  protected String getSnapshotTableName(String tableType, String hiveTableName) {
+    return tableType.equals(HoodieTableType.MERGE_ON_READ.name())
+        ? hiveTableName + "_rt" : hiveTableName;
+  }
+
+  private int getTimeWait() {
+    try (InputStream stream = HoodieTestHiveBase.class.getClassLoader().getResourceAsStream("hoodie-docker.properties")) {
+      TypedProperties properties = new TypedProperties();
+      properties.load(stream);
+      return properties.getInteger("hoodie.hiveserver.time.wait", DEFAULT_TIME_WAIT);
+    } catch (IOException e) {
+      LOG.warn("Can not load property file, use default time wait for hiveserver.");
+      return DEFAULT_TIME_WAIT;
+    }
+  }
+}
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index cf87273..c2312d0 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -180,7 +180,7 @@ public abstract class ITTestBase {
     }
   }
 
-  TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
+  protected TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
       throws Exception {
     LOG.info("\n\n#################################################################################################");
     LOG.info("Container : " + containerName + ", Running command :" + cmd);
@@ -190,7 +190,7 @@ public abstract class ITTestBase {
     return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed);
   }
 
-  Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
+  protected Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
 
     LOG.info("\n\n#################################################################################################");
     LOG.info("Running hive command :" + hiveCommand);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index 1d1ef8d..4b586a3 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -137,13 +137,13 @@ public class ITTestHoodieSanity extends ITTestBase {
     // Run Hoodie Java App
     String cmd;
     if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
-      cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+      cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
           + " --table-type " + tableType + " --hive-table " + hiveTableName;
     } else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
-      cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+      cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
           + " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
     } else {
-      cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+      cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
           + " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
     }
     executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
new file mode 100644
index 0000000..a6a4c3e
--- /dev/null
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.command;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.hudi.integ.HoodieTestHiveBase;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for HoodieSyncCommand in hudi-cli module.
+ */
+public class ITTestHoodieSyncCommand extends HoodieTestHiveBase {
+
+  private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh";
+  private static final String SYNC_VALIDATE_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sync-validate.commands";
+
+  @Test
+  public void testValidateSync() throws Exception {
+    String hiveTableName = "docker_hoodie_sync_valid_test";
+    String hiveTableName2 = "docker_hoodie_sync_valid_test_2";
+
+    generateDataByHoodieJavaApp(
+        hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "overwrite", hiveTableName);
+
+    syncHoodieTable(hiveTableName2, "INSERT");
+
+    generateDataByHoodieJavaApp(
+        hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "append", hiveTableName);
+
+    TestExecStartResultCallback result =
+        executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + SYNC_VALIDATE_COMMANDS, true);
+
+    String expected = String.format("Count difference now is (count(%s) - count(%s) == %d. Catch up count is %d",
+        hiveTableName, hiveTableName2, 100, 200);
+    assertTrue(result.getStderr().toString().contains(expected));
+
+    dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
+    dropHiveTables(hiveTableName2, HoodieTableType.COPY_ON_WRITE.name());
+  }
+
+  private void syncHoodieTable(String hiveTableName, String op) throws Exception {
+    StringBuilder cmdBuilder = new StringBuilder("spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 ")
+        .append(" --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer ").append(HUDI_UTILITIES_BUNDLE)
+        .append(" --table-type COPY_ON_WRITE ")
+        .append(" --base-file-format ").append(HoodieFileFormat.PARQUET.toString())
+        .append(" --source-class org.apache.hudi.utilities.sources.HoodieIncrSource --source-ordering-field timestamp ")
+        .append(" --target-base-path ").append(getHDFSPath(hiveTableName))
+        .append(" --target-table ").append(hiveTableName)
+        .append(" --op ").append(op)
+        .append(" --props file:///var/hoodie/ws/docker/demo/config/hoodie-incr.properties")
+        .append(" --enable-hive-sync");
+    executeCommandStringInDocker(ADHOC_1_CONTAINER, cmdBuilder.toString(), true);
+  }
+}
diff --git a/hudi-integ-test/src/test/resources/hoodie-docker.properties b/hudi-integ-test/src/test/resources/hoodie-docker.properties
new file mode 100644
index 0000000..d511370
--- /dev/null
+++ b/hudi-integ-test/src/test/resources/hoodie-docker.properties
@@ -0,0 +1,18 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.hiveserver.time.wait=5000
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh
index 7093c70..e2acc6c 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_app.sh
@@ -36,5 +36,5 @@ fi
 
 OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
 #TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
-echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@"
+echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
diff --git a/hudi-spark/src/test/java/HoodieJavaGenerateApp.java b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
new file mode 100644
index 0000000..64245e9
--- /dev/null
+++ b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieDataSourceHelpers;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+import org.apache.hudi.hive.NonPartitionedExtractor;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.testutils.DataSourceTestUtils;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HoodieJavaGenerateApp {
+  @Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie sample table")
+  private String tablePath = "file:///tmp/hoodie/sample-table";
+
+  @Parameter(names = {"--table-name", "-n"}, description = "Table name for Hoodie sample table")
+  private String tableName = "hoodie_test";
+
+  @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ")
+  private String tableType = HoodieTableType.COPY_ON_WRITE.name();
+
+  @Parameter(names = {"--hive-sync", "-hs"}, description = "Enable syncing to hive")
+  private Boolean enableHiveSync = false;
+
+  @Parameter(names = {"--hive-db", "-hd"}, description = "Hive database")
+  private String hiveDB = "default";
+
+  @Parameter(names = {"--hive-table", "-ht"}, description = "Hive table")
+  private String hiveTable = "hoodie_sample_test";
+
+  @Parameter(names = {"--hive-user", "-hu"}, description = "Hive username")
+  private String hiveUser = "hive";
+
+  @Parameter(names = {"--hive-password", "-hp"}, description = "Hive password")
+  private String hivePass = "hive";
+
+  @Parameter(names = {"--hive-url", "-hl"}, description = "Hive JDBC URL")
+  private String hiveJdbcUrl = "jdbc:hive2://localhost:10000";
+
+  @Parameter(names = {"--non-partitioned", "-np"}, description = "Use non-partitioned Table")
+  private Boolean nonPartitionedTable = false;
+
+  @Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use Multiple Partition Keys")
+  private Boolean useMultiPartitionKeys = false;
+
+  @Parameter(names = {"--commit-type", "-ct"}, description = "How may commits will run")
+  private String commitType = "overwrite";
+
+  @Parameter(names = {"--help", "-h"}, help = true)
+  public Boolean help = false;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieJavaGenerateApp.class);
+
+  public static void main(String[] args) throws Exception {
+    HoodieJavaGenerateApp cli = new HoodieJavaGenerateApp();
+    JCommander cmd = new JCommander(cli, null, args);
+
+    if (cli.help) {
+      cmd.usage();
+      System.exit(1);
+    }
+    try (SparkSession spark = cli.getOrCreateSparkSession()) {
+      cli.insert(spark);
+    }
+  }
+
+  private SparkSession getOrCreateSparkSession() {
+    // Spark session setup..
+    SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
+        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
+    spark.sparkContext().setLogLevel("WARN");
+    return spark;
+  }
+
+  private HoodieTestDataGenerator getDataGenerate() {
+    // Generator of some records to be loaded in.
+    if (nonPartitionedTable) {
+      // All data goes to base-path
+      return new HoodieTestDataGenerator(new String[]{""});
+    } else {
+      return new HoodieTestDataGenerator();
+    }
+  }
+
+  /**
+   * Setup configs for syncing to hive.
+   */
+  private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> writer) {
+    if (enableHiveSync) {
+      LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+      writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable)
+          .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
+          .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)
+          .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser)
+          .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass)
+          .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true");
+      if (nonPartitionedTable) {
+        writer = writer
+            .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
+                NonPartitionedExtractor.class.getCanonicalName())
+            .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
+      } else if (useMultiPartitionKeys) {
+        writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day").option(
+            DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
+            MultiPartKeysValueExtractor.class.getCanonicalName());
+      } else {
+        writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr");
+      }
+    }
+    return writer;
+  }
+
+  private void insert(SparkSession spark) throws IOException {
+    HoodieTestDataGenerator dataGen = getDataGenerate();
+
+    JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
+
+    // Generate some input..
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    List<HoodieRecord> recordsSoFar = new ArrayList<>(dataGen.generateInserts(instantTime/* ignore */, 100));
+    List<String> records1 = DataSourceTestUtils.convertToStringList(recordsSoFar);
+    Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
+
+    // Save as hoodie dataset (copy on write)
+    // specify the hoodie source
+    DataFrameWriter<Row> writer = inputDF1.write().format("org.apache.hudi")
+        // any hoodie client config can be passed like this
+        .option("hoodie.insert.shuffle.parallelism", "2")
+        // full list in HoodieWriteConfig & its package
+        .option("hoodie.upsert.shuffle.parallelism", "2")
+        // Hoodie Table Type
+        .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
+        // insert
+        .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
+        // This is the record key
+        .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
+        // this is the partition to place it into
+        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
+        // use to combine duplicate records in input/with disk val
+        .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
+        // Used by hive sync and queries
+        .option(HoodieWriteConfig.TABLE_NAME, tableName)
+        // Add Key Extractor
+        .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
+            nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
+                : SimpleKeyGenerator.class.getCanonicalName())
+        .mode(commitType);
+
+    updateHiveSyncConfig(writer);
+    // new dataset if needed
+    writer.save(tablePath); // ultimately where the dataset will be placed
+    FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
+    String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+    LOG.info("Commit at instant time :" + commitInstantTime1);
+  }
+}