You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/28 00:31:54 UTC
[hudi] branch master updated: [HUDI-703] Add test for
HoodieSyncCommand (#1774)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fa41921 [HUDI-703] Add test for HoodieSyncCommand (#1774)
fa41921 is described below
commit fa419213f62d2006aeea228180302024977feb16
Author: hongdd <jn...@163.com>
AuthorDate: Tue Jul 28 08:31:43 2020 +0800
[HUDI-703] Add test for HoodieSyncCommand (#1774)
---
docker/demo/config/hoodie-incr.properties | 31 ++++
docker/demo/config/hoodie-schema.avsc | 145 ++++++++++++++++
docker/demo/sync-validate.commands | 19 +++
hudi-cli/pom.xml | 17 ++
.../hudi/cli/commands/HoodieSyncCommand.java | 4 +-
.../org/apache/hudi/integ/HoodieTestHiveBase.java | 121 +++++++++++++
.../java/org/apache/hudi/integ/ITTestBase.java | 4 +-
.../org/apache/hudi/integ/ITTestHoodieSanity.java | 6 +-
.../integ/command/ITTestHoodieSyncCommand.java | 75 ++++++++
.../src/test/resources/hoodie-docker.properties | 18 ++
hudi-spark/run_hoodie_app.sh | 4 +-
.../src/test/java/HoodieJavaGenerateApp.java | 190 +++++++++++++++++++++
12 files changed, 625 insertions(+), 9 deletions(-)
diff --git a/docker/demo/config/hoodie-incr.properties b/docker/demo/config/hoodie-incr.properties
new file mode 100644
index 0000000..95a6627
--- /dev/null
+++ b/docker/demo/config/hoodie-incr.properties
@@ -0,0 +1,31 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+hoodie.upsert.shuffle.parallelism=2
+hoodie.insert.shuffle.parallelism=2
+hoodie.bulkinsert.shuffle.parallelism=2
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.partitionpath.field=partition
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc
+hoodie.deltastreamer.source.hoodieincr.partition.fields=partition
+hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
+hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
+# hive sync
+hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
+hoodie.datasource.hive_sync.partition_fields=partition
\ No newline at end of file
diff --git a/docker/demo/config/hoodie-schema.avsc b/docker/demo/config/hoodie-schema.avsc
new file mode 100644
index 0000000..55e255f
--- /dev/null
+++ b/docker/demo/config/hoodie-schema.avsc
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type": "record",
+ "name": "triprec",
+ "fields": [
+ {
+ "name": "timestamp",
+ "type": "double"
+ },
+ {
+ "name": "_row_key",
+ "type": "string"
+ },
+ {
+ "name": "rider",
+ "type": "string"
+ },
+ {
+ "name": "driver",
+ "type": "string"
+ },
+ {
+ "name": "begin_lat",
+ "type": "double"
+ },
+ {
+ "name": "begin_lon",
+ "type": "double"
+ },
+ {
+ "name": "end_lat",
+ "type": "double"
+ },
+ {
+ "name": "end_lon",
+ "type": "double"
+ },
+ {
+ "name": "distance_in_meters",
+ "type": "int"
+ },
+ {
+ "name": "seconds_since_epoch",
+ "type": "long"
+ },
+ {
+ "name": "weight",
+ "type": "float"
+ },
+ {
+ "name": "nation",
+ "type": "bytes"
+ },
+ {
+ "name": "current_date",
+ "type": {
+ "type": "int",
+ "logicalType": "date"
+ }
+ },
+ {
+ "name": "current_ts",
+ "type": {
+ "type": "long",
+ "logicalType": "timestamp-micros"
+ }
+ },
+ {
+ "name": "height",
+ "type": {
+ "type": "fixed",
+ "name": "abc",
+ "size": 5,
+ "logicalType": "decimal",
+ "precision": 10,
+ "scale": 6
+ }
+ },
+ {
+ "name": "city_to_state",
+ "type": {
+ "type": "map",
+ "values": "string"
+ }
+ },
+ {
+ "name": "fare",
+ "type": {
+ "type": "record",
+ "name": "fare",
+ "fields": [
+ {
+ "name": "amount",
+ "type": "double"
+ },
+ {
+ "name": "currency",
+ "type": "string"
+ }
+ ]
+ }
+ },
+ {
+ "name": "tip_history",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "tip_history",
+ "fields": [
+ {
+ "name": "amount",
+ "type": "double"
+ },
+ {
+ "name": "currency",
+ "type": "string"
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name": "_hoodie_is_deleted",
+ "type": "boolean",
+ "default": false
+ }
+ ]
+}
\ No newline at end of file
diff --git a/docker/demo/sync-validate.commands b/docker/demo/sync-validate.commands
new file mode 100644
index 0000000..32c334e
--- /dev/null
+++ b/docker/demo/sync-validate.commands
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+connect --path /docker_hoodie_sync_valid_test
+commits sync --path /docker_hoodie_sync_valid_test_2
+sync validate --sourceDb default --targetDb default --hiveServerUrl jdbc:hive2://hiveserver:10000 --hiveUser hive --hivePass hive
\ No newline at end of file
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 13bc848..388e414 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -181,6 +181,23 @@
<type>test-jar</type>
</dependency>
+ <!-- hive -->
+ <dependency>
+ <groupId>${hive.groupid}</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
index e0ceb2e..66c2eb0 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
@@ -74,9 +74,9 @@ public class HoodieSyncCommand implements CommandMarker {
}
String targetLatestCommit =
- targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp();
+ targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0";
String sourceLatestCommit =
- sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
+ sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0";
if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
new file mode 100644
index 0000000..e2e9276
--- /dev/null
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ;
+
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class to run cmd and generate data in hive.
+ */
+public class HoodieTestHiveBase extends ITTestBase {
+
+ protected enum PartitionType {
+ SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
+ }
+
+ private final static int DEFAULT_TIME_WAIT = 5000;
+ private final static String OVERWRITE_COMMIT_TYPE = "overwrite";
+
+ /**
+ * A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it.
+ * Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
+ * spark-shell test-case
+ */
+ public void generateDataByHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType,
+ String commitType, String hoodieTableName) throws Exception {
+
+ String hdfsPath = getHDFSPath(hiveTableName);
+ String hdfsUrl = "hdfs://namenode" + hdfsPath;
+
+ Pair<String, String> stdOutErr;
+ if (OVERWRITE_COMMIT_TYPE.equals(commitType)) {
+ // Drop Table if it exists
+ try {
+ dropHiveTables(hiveTableName, tableType);
+ } catch (AssertionError ex) {
+ // In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
+ // Workaround to sleep for 5 secs and retry
+ // Set sleep time by hoodie.hiveserver.time.wait
+ Thread.sleep(getTimeWait());
+ dropHiveTables(hiveTableName, tableType);
+ }
+
+ // Ensure table does not exist
+ stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
+ if (!stdOutErr.getLeft().isEmpty()) {
+ throw new TableExistsException("Dropped table " + hiveTableName + " exists!");
+ }
+ }
+
+ // Run Hoodie Java App
+ String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" +
+ " --commit-type %s --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
+ tableType, hiveTableName, commitType, hoodieTableName);
+ if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
+ cmd = cmd + " --use-multi-partition-keys";
+ } else if (partitionType == PartitionType.NON_PARTITIONED){
+ cmd = cmd + " --non-partitioned";
+ }
+ executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
+
+ String snapshotTableName = getSnapshotTableName(tableType, hiveTableName);
+
+ // Ensure table does exist
+ stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName + "'");
+ assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists");
+ }
+
+ protected void dropHiveTables(String hiveTableName, String tableType) throws Exception {
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
+ executeHiveCommand("drop table if exists " + hiveTableName + "_ro");
+ } else {
+ executeHiveCommand("drop table if exists " + hiveTableName);
+ }
+ }
+
+ protected String getHDFSPath(String hiveTableName) {
+ return "/" + hiveTableName;
+ }
+
+ protected String getSnapshotTableName(String tableType, String hiveTableName) {
+ return tableType.equals(HoodieTableType.MERGE_ON_READ.name())
+ ? hiveTableName + "_rt" : hiveTableName;
+ }
+
+ private int getTimeWait() {
+ try (InputStream stream = HoodieTestHiveBase.class.getClassLoader().getResourceAsStream("hoodie-docker.properties")) {
+ TypedProperties properties = new TypedProperties();
+ properties.load(stream);
+ return properties.getInteger("hoodie.hiveserver.time.wait", DEFAULT_TIME_WAIT);
+ } catch (IOException e) {
+ LOG.warn("Can not load property file, use default time wait for hiveserver.");
+ return DEFAULT_TIME_WAIT;
+ }
+ }
+}
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index cf87273..c2312d0 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -180,7 +180,7 @@ public abstract class ITTestBase {
}
}
- TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
+ protected TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
throws Exception {
LOG.info("\n\n#################################################################################################");
LOG.info("Container : " + containerName + ", Running command :" + cmd);
@@ -190,7 +190,7 @@ public abstract class ITTestBase {
return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed);
}
- Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
+ protected Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
LOG.info("\n\n#################################################################################################");
LOG.info("Running hive command :" + hiveCommand);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index 1d1ef8d..4b586a3 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -137,13 +137,13 @@ public class ITTestHoodieSanity extends ITTestBase {
// Run Hoodie Java App
String cmd;
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
- cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName;
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
- cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
} else {
- cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
}
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
new file mode 100644
index 0000000..a6a4c3e
--- /dev/null
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.command;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.hudi.integ.HoodieTestHiveBase;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for HoodieSyncCommand in hudi-cli module.
+ */
+public class ITTestHoodieSyncCommand extends HoodieTestHiveBase {
+
+ private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh";
+ private static final String SYNC_VALIDATE_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sync-validate.commands";
+
+ @Test
+ public void testValidateSync() throws Exception {
+ String hiveTableName = "docker_hoodie_sync_valid_test";
+ String hiveTableName2 = "docker_hoodie_sync_valid_test_2";
+
+ generateDataByHoodieJavaApp(
+ hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "overwrite", hiveTableName);
+
+ syncHoodieTable(hiveTableName2, "INSERT");
+
+ generateDataByHoodieJavaApp(
+ hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "append", hiveTableName);
+
+ TestExecStartResultCallback result =
+ executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + SYNC_VALIDATE_COMMANDS, true);
+
+ String expected = String.format("Count difference now is (count(%s) - count(%s) == %d. Catch up count is %d",
+ hiveTableName, hiveTableName2, 100, 200);
+ assertTrue(result.getStderr().toString().contains(expected));
+
+ dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
+ dropHiveTables(hiveTableName2, HoodieTableType.COPY_ON_WRITE.name());
+ }
+
+ private void syncHoodieTable(String hiveTableName, String op) throws Exception {
+ StringBuilder cmdBuilder = new StringBuilder("spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 ")
+ .append(" --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer ").append(HUDI_UTILITIES_BUNDLE)
+ .append(" --table-type COPY_ON_WRITE ")
+ .append(" --base-file-format ").append(HoodieFileFormat.PARQUET.toString())
+ .append(" --source-class org.apache.hudi.utilities.sources.HoodieIncrSource --source-ordering-field timestamp ")
+ .append(" --target-base-path ").append(getHDFSPath(hiveTableName))
+ .append(" --target-table ").append(hiveTableName)
+ .append(" --op ").append(op)
+ .append(" --props file:///var/hoodie/ws/docker/demo/config/hoodie-incr.properties")
+ .append(" --enable-hive-sync");
+ executeCommandStringInDocker(ADHOC_1_CONTAINER, cmdBuilder.toString(), true);
+ }
+}
diff --git a/hudi-integ-test/src/test/resources/hoodie-docker.properties b/hudi-integ-test/src/test/resources/hoodie-docker.properties
new file mode 100644
index 0000000..d511370
--- /dev/null
+++ b/hudi-integ-test/src/test/resources/hoodie-docker.properties
@@ -0,0 +1,18 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.hiveserver.time.wait=5000
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh
index 7093c70..e2acc6c 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_app.sh
@@ -36,5 +36,5 @@ fi
OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
-echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@"
+echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
diff --git a/hudi-spark/src/test/java/HoodieJavaGenerateApp.java b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
new file mode 100644
index 0000000..64245e9
--- /dev/null
+++ b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieDataSourceHelpers;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+import org.apache.hudi.hive.NonPartitionedExtractor;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.testutils.DataSourceTestUtils;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HoodieJavaGenerateApp {
+ @Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie sample table")
+ private String tablePath = "file:///tmp/hoodie/sample-table";
+
+ @Parameter(names = {"--table-name", "-n"}, description = "Table name for Hoodie sample table")
+ private String tableName = "hoodie_test";
+
+ @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ")
+ private String tableType = HoodieTableType.COPY_ON_WRITE.name();
+
+ @Parameter(names = {"--hive-sync", "-hs"}, description = "Enable syncing to hive")
+ private Boolean enableHiveSync = false;
+
+ @Parameter(names = {"--hive-db", "-hd"}, description = "Hive database")
+ private String hiveDB = "default";
+
+ @Parameter(names = {"--hive-table", "-ht"}, description = "Hive table")
+ private String hiveTable = "hoodie_sample_test";
+
+ @Parameter(names = {"--hive-user", "-hu"}, description = "Hive username")
+ private String hiveUser = "hive";
+
+ @Parameter(names = {"--hive-password", "-hp"}, description = "Hive password")
+ private String hivePass = "hive";
+
+ @Parameter(names = {"--hive-url", "-hl"}, description = "Hive JDBC URL")
+ private String hiveJdbcUrl = "jdbc:hive2://localhost:10000";
+
+ @Parameter(names = {"--non-partitioned", "-np"}, description = "Use non-partitioned Table")
+ private Boolean nonPartitionedTable = false;
+
+ @Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use Multiple Partition Keys")
+ private Boolean useMultiPartitionKeys = false;
+
+ @Parameter(names = {"--commit-type", "-ct"}, description = "How may commits will run")
+ private String commitType = "overwrite";
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ private static final Logger LOG = LogManager.getLogger(HoodieJavaGenerateApp.class);
+
+ public static void main(String[] args) throws Exception {
+ HoodieJavaGenerateApp cli = new HoodieJavaGenerateApp();
+ JCommander cmd = new JCommander(cli, null, args);
+
+ if (cli.help) {
+ cmd.usage();
+ System.exit(1);
+ }
+ try (SparkSession spark = cli.getOrCreateSparkSession()) {
+ cli.insert(spark);
+ }
+ }
+
+ private SparkSession getOrCreateSparkSession() {
+ // Spark session setup..
+ SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
+ spark.sparkContext().setLogLevel("WARN");
+ return spark;
+ }
+
+ private HoodieTestDataGenerator getDataGenerate() {
+ // Generator of some records to be loaded in.
+ if (nonPartitionedTable) {
+ // All data goes to base-path
+ return new HoodieTestDataGenerator(new String[]{""});
+ } else {
+ return new HoodieTestDataGenerator();
+ }
+ }
+
+ /**
+ * Setup configs for syncing to hive.
+ */
+ private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> writer) {
+ if (enableHiveSync) {
+ LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+ writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable)
+ .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
+ .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)
+ .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser)
+ .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass)
+ .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true");
+ if (nonPartitionedTable) {
+ writer = writer
+ .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
+ NonPartitionedExtractor.class.getCanonicalName())
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
+ } else if (useMultiPartitionKeys) {
+ writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day").option(
+ DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
+ MultiPartKeysValueExtractor.class.getCanonicalName());
+ } else {
+ writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr");
+ }
+ }
+ return writer;
+ }
+
+ private void insert(SparkSession spark) throws IOException {
+ HoodieTestDataGenerator dataGen = getDataGenerate();
+
+ JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
+
+ // Generate some input..
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> recordsSoFar = new ArrayList<>(dataGen.generateInserts(instantTime/* ignore */, 100));
+ List<String> records1 = DataSourceTestUtils.convertToStringList(recordsSoFar);
+ Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
+
+ // Save as hoodie dataset (copy on write)
+ // specify the hoodie source
+ DataFrameWriter<Row> writer = inputDF1.write().format("org.apache.hudi")
+ // any hoodie client config can be passed like this
+ .option("hoodie.insert.shuffle.parallelism", "2")
+ // full list in HoodieWriteConfig & its package
+ .option("hoodie.upsert.shuffle.parallelism", "2")
+ // Hoodie Table Type
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
+ // insert
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
+ // This is the record key
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
+ // this is the partition to place it into
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
+ // use to combine duplicate records in input/with disk val
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
+ // Used by hive sync and queries
+ .option(HoodieWriteConfig.TABLE_NAME, tableName)
+ // Add Key Extractor
+ .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
+ nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
+ : SimpleKeyGenerator.class.getCanonicalName())
+ .mode(commitType);
+
+ updateHiveSyncConfig(writer);
+ // new dataset if needed
+ writer.save(tablePath); // ultimately where the dataset will be placed
+ FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
+ String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+ LOG.info("Commit at instant time :" + commitInstantTime1);
+ }
+}