You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/07/26 11:34:39 UTC

[incubator-hudi] 02/03: Hoodie Demo automation

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch pom-bundle-cleanup
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git

commit 69befdea8407bc20c6027dbcec8d4e904f8b05f5
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Thu Jul 11 15:04:08 2019 -0700

    Hoodie Demo automation
---
 .travis.yml                                        |   5 +-
 docker/demo/compaction_commands                    |   4 +
 docker/demo/config/dfs-source.properties           |  27 +++
 hoodie-cli/hoodie-cli.sh                           |   2 +-
 .../hoodie/cli/commands/CompactionCommand.java     |  15 +-
 hoodie-integ-test/pom.xml                          |  15 +-
 .../java/com/uber/hoodie/integ/ITTestBase.java     |  31 ++-
 .../com/uber/hoodie/integ/ITTestHoodieDemo.java    | 210 +++++++++++++++++++++
 pom.xml                                            |   4 +
 tools/run_travis_tests.sh                          |  17 ++
 10 files changed, 324 insertions(+), 6 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 2cad58f..493270c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,7 +3,8 @@ jdk:
 - oraclejdk8
 sudo: required
 env:
-- HUDI_QUIETER_LOGGING=1
+  - HUDI_QUIETER_LOGGING=1 TEST_SUITE=unit
+  - TEST_SUITE=integration
 services:
 - docker
 cache:
@@ -14,3 +15,5 @@ notifications:
     rooms:
       - secure: WNIZPBY//xf/xTJL1YUPzvPUDwjawaMM4IJ6IqxjRGcZCmuhNVu2XTJ3aL1g6X7ZcJKxJuwoU/TbSO8Dl6rgWSo/2OfyzBd4ks+hgeCsdycccTcvO8giQO1DOUGUSRdvUzOvKjWVK7iARYzQhoZawAYwI09UJLlwhYRCJ1IKc1ZksrEt964GeEmPyJbwMoZOJVUU84jJIAZPIpOFGTKM652FMermg9yaY2W5oSjDXaV98z0/mJV4Ry++J2v0fvoDs5HxkXYhZJP+dpWR82KDr6Q6LGL5/IlJ+b+IH3pF8LyKR4nCH6l1EZ8KpoFZapyYWYQpXMfQoF2K/JEQkpz1EqBCeEDSJ2+j1PPLhOWXd7ok4DsS26S8BP2ImvyXwua51THN1/r1fCGSIdxiQ5C8aeYmPCSr+oLChCVivEG2eeU34Z1nQJ5aDymNGeFE9qUUpjS0ETfFcjI/WQaA+FiYiPkDfeAo [...]
     on_pull_requests: false
+script:
+  tools/run_travis_tests.sh $TEST_SUITE
diff --git a/docker/demo/compaction_commands b/docker/demo/compaction_commands
new file mode 100644
index 0000000..17d66fc
--- /dev/null
+++ b/docker/demo/compaction_commands
@@ -0,0 +1,4 @@
+connect --path /user/hive/warehouse/stock_ticks_mor
+compactions show all
+compaction schedule
+compaction run --parallelism 2 --sparkMemory 1G  --schemaFilePath /var/demo/config/schema.avsc --retry 1 
diff --git a/docker/demo/config/dfs-source.properties b/docker/demo/config/dfs-source.properties
new file mode 100644
index 0000000..3e17661
--- /dev/null
+++ b/docker/demo/config/dfs-source.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+include=base.properties
+# Key fields, for kafka example
+hoodie.datasource.write.recordkey.field=key
+hoodie.datasource.write.partitionpath.field=date
+# Schema provider props (change to absolute path based on your installation)
+hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc
+hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc
+# DFS Source
+hoodie.deltastreamer.source.dfs.root=/usr/hive/data/input/
diff --git a/hoodie-cli/hoodie-cli.sh b/hoodie-cli/hoodie-cli.sh
index b67a067..e1909d5 100755
--- a/hoodie-cli/hoodie-cli.sh
+++ b/hoodie-cli/hoodie-cli.sh
@@ -13,4 +13,4 @@ fi
 if [ -z "$CLIENT_JAR" ]; then
   echo "client jar location not set"
 fi
-java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap
+java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java
index 7ec4637..51f511d 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java
@@ -219,12 +219,23 @@ public class CompactionCommand implements CommandMarker {
       final String sparkMemory,
       @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries")
       final String retry,
-      @CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset")
-      final String compactionInstantTime) throws Exception {
+      @CliOption(key = "compactionInstant", mandatory = false, help = "Base path for the target hoodie dataset")
+      String compactionInstantTime) throws Exception {
     boolean initialized = HoodieCLI.initConf();
     HoodieCLI.initFS(initialized);
 
     if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
+      if (null == compactionInstantTime) {
+        // pick outstanding one with lowest timestamp
+        Optional<String> firstPendingInstant = HoodieCLI.tableMetadata.reloadActiveTimeline()
+            .filterCompletedAndCompactionInstants().filter(instant -> instant.getAction()
+                .equals(HoodieTimeline.COMPACTION_ACTION)).firstInstant().map(HoodieInstant::getTimestamp);
+        if (!firstPendingInstant.isPresent()) {
+          return "NO PENDING COMPACTION TO RUN";
+        }
+        compactionInstantTime = firstPendingInstant.get();
+      }
+
       String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
           scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
       SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
diff --git a/hoodie-integ-test/pom.xml b/hoodie-integ-test/pom.xml
index 8bb0639..e41281a 100644
--- a/hoodie-integ-test/pom.xml
+++ b/hoodie-integ-test/pom.xml
@@ -154,12 +154,25 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-failsafe-plugin</artifactId>
         <version>2.22.0</version>
+        <configuration>
+          <includes>
+            <include>**/ITT*.java</include>
+          </includes>
+        </configuration>
         <executions>
           <execution>
+            <phase>integration-test</phase>
             <goals>
               <goal>integration-test</goal>
             </goals>
           </execution>
+          <execution>
+            <id>verify</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>verify</goal>
+            </goals>
+          </execution>
         </executions>
       </plugin>
       <plugin>
@@ -183,7 +196,7 @@
           </execution>
           <execution>
             <id>down</id>
-            <phase>integration-test</phase>
+            <phase>post-integration-test</phase>
             <goals>
               <goal>down</goal>
             </goals>
diff --git a/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestBase.java b/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestBase.java
index 995afe2..be34aba 100644
--- a/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestBase.java
+++ b/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestBase.java
@@ -32,13 +32,13 @@ import com.github.dockerjava.core.DockerClientConfig;
 import com.github.dockerjava.core.command.ExecStartResultCallback;
 import com.github.dockerjava.jaxrs.JerseyDockerCmdExecFactory;
 import com.google.common.collect.ImmutableList;
+import com.uber.hoodie.common.util.collection.Pair;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
@@ -59,6 +59,8 @@ public abstract class ITTestBase {
       HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hive-bundle.jar";
   protected static final String HUDI_SPARK_BUNDLE =
       HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-spark-bundle.jar";
+  protected static final String HUDI_UTILITIES_BUNDLE =
+      HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar";
   protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000";
   // Skip these lines when capturing output from hive
   protected static final Integer SLF4J_WARNING_LINE_COUNT_IN_HIVE_CMD = 9;
@@ -145,6 +147,33 @@ public abstract class ITTestBase {
     return callback;
   }
 
+  protected void executeCommandsInDocker(String containerName, List<String[]> commands) throws Exception {
+    for (String[] cmd : commands) {
+      TestExecStartResultCallback callback = executeCommandInDocker(containerName, cmd, true);
+      String stdout = callback.getStdout().toString();
+      String stderr = callback.getStderr().toString();
+      LOG.info("Got output for (" + Arrays.toString(cmd) + ") :" + stdout);
+      LOG.info("Got error output for (" + Arrays.toString(cmd) + ") :" + stderr);
+    }
+  }
+
+  protected Pair<String, String> executeHiveCommand(String hiveCommand, boolean expectedToSucceed) throws Exception {
+    String[] hiveTableCheck = getHiveConsoleCommand(hiveCommand);
+    TestExecStartResultCallback callback =
+        executeCommandInDocker(HIVESERVER, hiveTableCheck, expectedToSucceed);
+    String stderr = callback.getStderr().toString();
+    String stdout = callback.getStdout().toString();
+    LOG.info("Got output for (" + Arrays.toString(hiveTableCheck) + ") :" + stdout);
+    LOG.info("Got error output for (" + Arrays.toString(hiveTableCheck) + ") :" + stderr);
+    return Pair.of(stdout.trim(), stderr.trim());
+  }
+
+  protected void executeHiveCommandWithExpectedOutput(String hiveCommand, String expectedOutput) throws Exception {
+    Pair<String, String> stdOutErrPair = executeHiveCommand(hiveCommand, true);
+    System.out.println("Output : (" + stdOutErrPair.getLeft().replace("\n", ",") + ")");
+    Assert.assertEquals(expectedOutput, stdOutErrPair.getLeft());
+  }
+
   public class TestExecStartResultCallback extends ExecStartResultCallback {
 
     // Storing the reference in subclass to expose to clients
diff --git a/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieDemo.java b/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieDemo.java
new file mode 100644
index 0000000..bfbdc76
--- /dev/null
+++ b/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieDemo.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.integ;
+
+import com.google.common.collect.ImmutableList;
+import com.uber.hoodie.common.util.collection.Pair;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Goes through steps described in https://hudi.incubator.apache.org/docker_demo.html
+ * Tests only the hive query side
+ */
+public class ITTestHoodieDemo extends ITTestBase {
+
+  private static String HDFS_DATA_DIR = "/usr/hive/data/input";
+  private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/" + "batch_1.json";
+  private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/" + "batch_2.json";
+
+  private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT +
+      "/docker/demo/data/batch_1.json";
+  private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT +
+      "/docker/demo/data/batch_2.json";
+
+  private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
+  private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
+  private static String COW_TABLE_NAME = "stock_ticks_cow";
+  private static String MOR_TABLE_NAME = "stock_ticks_mor";
+
+  private static String DEMO_CONTAINER_SCRIPT = "/var/hoodie/ws/docker/demo/setup_demo_container.sh";
+  private static String HIVE_SYNC_TOOL = HOODIE_WS_ROOT + "/hoodie-hive/run_sync_tool.sh";
+  private static String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hoodie-cli/hoodie-cli.sh";
+  private static String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction_commands";
+
+  @Test
+  public void testDemo() throws Exception {
+    setupDemo();
+    ingestFirstBatchAndHiveSync();
+    testHiveAfterFirstBatch();
+    ingestSecondBatch();
+    testHiveAfterSecondBatch(false, true);
+    testIncrementalHiveQuery();
+    scheduleAndRunCompaction();
+    testHiveAfterSecondBatch(true, false);
+    testIncrementalHiveQuery();
+  }
+
+  private void setupDemo() throws Exception {
+    List<String[]> cmds = new ImmutableList.Builder<String[]>()
+        .add(new String[]{"hadoop", "fs", "-mkdir", "-p", HDFS_DATA_DIR})
+        .add(new String[]{"hadoop", "fs", "-copyFromLocal", "-f", INPUT_BATCH_PATH1, HDFS_BATCH_PATH1})
+        .add(new String[]{"/bin/bash", DEMO_CONTAINER_SCRIPT})
+        .build();
+    executeCommandsInDocker(ADHOC_1_CONTAINER, cmds);
+  }
+
+  private void ingestFirstBatchAndHiveSync() throws Exception {
+    List<String[]> cmds = new ImmutableList.Builder<String[]>()
+        .add(new String[]{"spark-submit", "--class", "com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer",
+            HUDI_UTILITIES_BUNDLE, "--storage-type", "COPY_ON_WRITE", "--source-class",
+            "com.uber.hoodie.utilities.sources.JsonDFSSource", "--source-ordering-field", "ts", "--target-base-path",
+            COW_BASE_PATH, "--target-table", COW_TABLE_NAME, "--props", "/var/demo/config/dfs-source.properties",
+            "--schemaprovider-class", "com.uber.hoodie.utilities.schema.FilebasedSchemaProvider"})
+        .add(new String[]{"spark-submit", "--class", "com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer",
+            HUDI_UTILITIES_BUNDLE, "--storage-type", "MERGE_ON_READ", "--source-class",
+            "com.uber.hoodie.utilities.sources.JsonDFSSource", "--source-ordering-field", "ts", "--target-base-path",
+            MOR_BASE_PATH, "--target-table", MOR_TABLE_NAME, "--props", "/var/demo/config/dfs-source.properties",
+            "--schemaprovider-class", "com.uber.hoodie.utilities.schema.FilebasedSchemaProvider",
+            "--disable-compaction"})
+        .add(new String[]{HIVE_SYNC_TOOL, "--jdbc-url", "jdbc:hive2://hiveserver:10000", "--user", "hive",
+            "--pass", "hive", "--partitioned-by", "dt", "--base-path", COW_BASE_PATH, "--database", "default",
+            "--table", COW_TABLE_NAME})
+        .add(new String[]{HIVE_SYNC_TOOL, "--jdbc-url", "jdbc:hive2://hiveserver:10000", "--user", "hive",
+            "--pass", "hive", "--partitioned-by", "dt", "--base-path", MOR_BASE_PATH, "--database", "default",
+            "--table", MOR_TABLE_NAME})
+        .build();
+
+    executeCommandsInDocker(ADHOC_1_CONTAINER, cmds);
+  }
+
+  private void testHiveAfterFirstBatch() throws Exception {
+    Pair<String, String> res = executeHiveCommand("show tables", true);
+    Assert.assertTrue(res.getLeft().contains("stock_ticks_cow"));
+    Assert.assertTrue(res.getLeft().contains("stock_ticks_mor"));
+    Assert.assertTrue(res.getLeft().contains("stock_ticks_mor_rt"));
+    executeHiveCommandWithExpectedOutput("show partitions stock_ticks_cow", "dt=2018-08-31");
+    executeHiveCommandWithExpectedOutput("show partitions stock_ticks_mor", "dt=2018-08-31");
+    executeHiveCommandWithExpectedOutput("show partitions stock_ticks_mor_rt", "dt=2018-08-31");
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'",
+        "GOOG\t2018-08-31 10:29:00");
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'",
+        "GOOG\t2018-08-31 10:29:00");
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'",
+        "GOOG\t2018-08-31 10:29:00");
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG'",
+        "GOOG\t2018-08-31 09:59:00\t6330\t1230.5\t1230.02\n"
+            + "GOOG\t2018-08-31 10:29:00\t3391\t1230.1899\t1230.085");
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG'",
+        "GOOG\t2018-08-31 09:59:00\t6330\t1230.5\t1230.02\n"
+            + "GOOG\t2018-08-31 10:29:00\t3391\t1230.1899\t1230.085");
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG'",
+        "GOOG\t2018-08-31 09:59:00\t6330\t1230.5\t1230.02\n"
+            + "GOOG\t2018-08-31 10:29:00\t3391\t1230.1899\t1230.085");
+  }
+
+  private void ingestSecondBatch() throws Exception {
+    List<String[]> cmds = new ImmutableList.Builder<String[]>()
+        .add(new String[]{"hadoop", "fs", "-copyFromLocal", "-f", INPUT_BATCH_PATH2, HDFS_BATCH_PATH2})
+        .add(new String[]{"spark-submit", "--class", "com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer",
+            HUDI_UTILITIES_BUNDLE, "--storage-type", "COPY_ON_WRITE", "--source-class",
+            "com.uber.hoodie.utilities.sources.JsonDFSSource", "--source-ordering-field", "ts", "--target-base-path",
+            COW_BASE_PATH, "--target-table", COW_TABLE_NAME, "--props", "/var/demo/config/dfs-source.properties",
+            "--schemaprovider-class", "com.uber.hoodie.utilities.schema.FilebasedSchemaProvider"})
+        .add(new String[]{"spark-submit", "--class", "com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer",
+            HUDI_UTILITIES_BUNDLE, "--storage-type", "MERGE_ON_READ", "--source-class",
+            "com.uber.hoodie.utilities.sources.JsonDFSSource", "--source-ordering-field", "ts", "--target-base-path",
+            MOR_BASE_PATH, "--target-table", MOR_TABLE_NAME, "--props", "/var/demo/config/dfs-source.properties",
+            "--schemaprovider-class", "com.uber.hoodie.utilities.schema.FilebasedSchemaProvider",
+            "--disable-compaction"})
+        .build();
+    executeCommandsInDocker(ADHOC_1_CONTAINER, cmds);
+  }
+
+  private void testHiveAfterSecondBatch(boolean hasCompactionRan, boolean testCOWTable) throws Exception {
+
+    if (testCOWTable) {
+      executeHiveCommandWithExpectedOutput(
+          "select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'",
+          "GOOG\t2018-08-31 10:59:00");
+    }
+
+    if (hasCompactionRan) {
+      executeHiveCommandWithExpectedOutput(
+          "select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'",
+          "GOOG\t2018-08-31 10:59:00");
+    } else {
+      executeHiveCommandWithExpectedOutput(
+          "select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'",
+          "GOOG\t2018-08-31 10:29:00");
+    }
+
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'",
+        "GOOG\t2018-08-31 10:59:00");
+
+    if (testCOWTable) {
+      executeHiveCommandWithExpectedOutput(
+          "select symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG'",
+          "GOOG\t2018-08-31 09:59:00\t6330\t1230.5\t1230.02\n"
+              + "GOOG\t2018-08-31 10:59:00\t9021\t1227.1993\t1227.215");
+    }
+
+    if (hasCompactionRan) {
+      executeHiveCommandWithExpectedOutput(
+          "select symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG'",
+          "GOOG\t2018-08-31 09:59:00\t6330\t1230.5\t1230.02\n"
+              + "GOOG\t2018-08-31 10:59:00\t9021\t1227.1993\t1227.215");
+    } else {
+      executeHiveCommandWithExpectedOutput(
+          "select symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG'",
+          "GOOG\t2018-08-31 09:59:00\t6330\t1230.5\t1230.02\n"
+              + "GOOG\t2018-08-31 10:29:00\t3391\t1230.1899\t1230.085");
+    }
+    executeHiveCommandWithExpectedOutput(
+        "select symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG'",
+        "GOOG\t2018-08-31 09:59:00\t6330\t1230.5\t1230.02\n"
+            + "GOOG\t2018-08-31 10:59:00\t9021\t1227.1993\t1227.215");
+  }
+
+  private void testIncrementalHiveQuery() throws Exception {
+    Pair<String, String> stdoutErr = executeHiveCommand("select min(`_hoodie_commit_time`) from stock_ticks_cow", true);
+    String minInstantTimeForCOW = stdoutErr.getLeft();
+    executeHiveCommandWithExpectedOutput("set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL; "
+            + "set hoodie.stock_ticks_cow.consume.max.commits=3; set hoodie.stock_ticks_cow.consume.start.timestamp="
+            + minInstantTimeForCOW
+            + "; select symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG' "
+            + "and `_hoodie_commit_time` > '" + minInstantTimeForCOW + "';",
+        "GOOG\t2018-08-31 10:59:00\t9021\t1227.1993\t1227.215");
+  }
+
+  private void scheduleAndRunCompaction() throws Exception {
+    List<String[]> cmds = new ImmutableList.Builder<String[]>()
+        .add(new String[]{HUDI_CLI_TOOL, "--cmdfile", COMPACTION_COMMANDS})
+        .build();
+    executeCommandsInDocker(ADHOC_1_CONTAINER, cmds);
+  }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 96fef30..109863c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,6 +153,9 @@
     <codehaus-jackson.version>1.9.13</codehaus-jackson.version>
     <notice.dir>${project.basedir}</notice.dir>
     <notice.file>NOTICE.txt</notice.file>
+    <skipTests>false</skipTests>
+    <skipITs>${skipTests}</skipITs>
+    <skipUTs>${skipTests}</skipUTs>
   </properties>
 
   <scm>
@@ -297,6 +300,7 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <version>${maven-surefire-plugin.version}</version>
         <configuration>
+          <skip>${skipUTs}</skip>
           <!-- Sets the VM argument line used when unit tests are run. -->
           <argLine>${surefireArgLine}</argLine>
           <systemPropertyVariables>
diff --git a/tools/run_travis_tests.sh b/tools/run_travis_tests.sh
new file mode 100755
index 0000000..51a34e4
--- /dev/null
+++ b/tools/run_travis_tests.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+mode=$1
+
+if [ "$mode" = "unit" ];
+then
+  echo "Running Unit Tests"
+  mvn test -DskipITs=true -B
+elif [ "$mode" = "integration" ];
+then
+  echo "Running Integration Tests"
+  mvn verify -DskipUTs=true -B
+else
+  echo "Unknown mode $mode"
+  exit 1;
+fi
+