You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/08/05 14:50:23 UTC
[hudi] branch master updated: [HUDI-575] Spark Streaming with async
compaction support (#1752)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7a2429f [HUDI-575] Spark Streaming with async compaction support (#1752)
7a2429f is described below
commit 7a2429f5ba45e3c3c6f4e167fdc5b79be716febb
Author: Balaji Varadarajan <ba...@robinhood.com>
AuthorDate: Wed Aug 5 07:50:15 2020 -0700
[HUDI-575] Spark Streaming with async compaction support (#1752)
---
.../apache/hudi/async/AbstractAsyncService.java | 20 +-
.../org/apache/hudi/async/AsyncCompactService.java | 161 ++++++++++++
.../java/org/apache/hudi/client}/Compactor.java | 4 +-
.../common/testutils/HoodieTestDataGenerator.java | 20 ++
.../org/apache/hudi/integ/HoodieTestHiveBase.java | 4 +-
.../java/org/apache/hudi/integ/ITTestBase.java | 2 +
.../org/apache/hudi/integ/ITTestHoodieSanity.java | 48 ++--
hudi-spark/run_hoodie_app.sh | 2 +-
...un_hoodie_app.sh => run_hoodie_generate_app.sh} | 2 +-
...n_hoodie_app.sh => run_hoodie_streaming_app.sh} | 4 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 11 +-
.../async/SparkStreamingAsyncCompactService.java | 35 +++
.../scala/org/apache/hudi/DataSourceOptions.scala | 4 +
.../main/scala/org/apache/hudi/DefaultSource.scala | 4 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 277 ++++++++++++---------
.../org/apache/hudi/HoodieStreamingSink.scala | 104 ++++++--
hudi-spark/src/test/java/HoodieJavaApp.java | 3 +
.../src/test/java/HoodieJavaStreamingApp.java | 212 +++++++++++++---
.../functional/HoodieSparkSqlWriterSuite.scala | 3 +-
.../apache/hudi/functional/TestDataSource.scala | 41 ++-
.../deltastreamer/HoodieDeltaStreamer.java | 109 +-------
.../deltastreamer/SchedulerConfGenerator.java | 5 +-
22 files changed, 763 insertions(+), 312 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
index 7ac236d..714fa60 100644
--- a/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
@@ -48,9 +48,16 @@ public abstract class AbstractAsyncService implements Serializable {
private transient ExecutorService executor;
// Future tracking delta-sync/compaction
private transient CompletableFuture future;
+ // Run in daemon mode
+ private final boolean runInDaemonMode;
protected AbstractAsyncService() {
+ this(false);
+ }
+
+ protected AbstractAsyncService(boolean runInDaemonMode) {
shutdownRequested = false;
+ this.runInDaemonMode = runInDaemonMode;
}
protected boolean isShutdownRequested() {
@@ -129,7 +136,11 @@ public abstract class AbstractAsyncService implements Serializable {
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
LOG.info("Submitting monitor thread !!");
- Executors.newSingleThreadExecutor().submit(() -> {
+ Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "Monitor Thread");
+ t.setDaemon(isRunInDaemonMode());
+ return t;
+ }).submit(() -> {
boolean error = false;
try {
LOG.info("Monitoring thread(s) !!");
@@ -137,18 +148,21 @@ public abstract class AbstractAsyncService implements Serializable {
} catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true;
- shutdown(false);
} catch (InterruptedException ie) {
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
- shutdown(false);
} finally {
// Mark as shutdown
shutdown = true;
if (null != onShutdownCallback) {
onShutdownCallback.apply(error);
}
+ shutdown(false);
}
});
}
+
+ public boolean isRunInDaemonMode() {
+ return runInDaemonMode;
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java
new file mode 100644
index 0000000..1c39bb6
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.Compactor;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+
+/**
+ * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
+ */
+public class AsyncCompactService extends AbstractAsyncService {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
+
+ /**
+ * This is the job pool used by async compaction.
+ */
+ public static final String COMPACT_POOL_NAME = "hoodiecompact";
+
+ private final int maxConcurrentCompaction;
+ private transient Compactor compactor;
+ private transient JavaSparkContext jssc;
+ private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
+ private transient ReentrantLock queueLock = new ReentrantLock();
+ private transient Condition consumed = queueLock.newCondition();
+
+ public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
+ this(jssc, client, false);
+ }
+
+ public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client, boolean runInDaemonMode) {
+ super(runInDaemonMode);
+ this.jssc = jssc;
+ this.compactor = new Compactor(client, jssc);
+ this.maxConcurrentCompaction = 1;
+ }
+
+ /**
+ * Enqueues new Pending compaction.
+ */
+ public void enqueuePendingCompaction(HoodieInstant instant) {
+ pendingCompactions.add(instant);
+ }
+
+ /**
+ * Wait till outstanding pending compactions reduces to the passed in value.
+ *
+ * @param numPendingCompactions Maximum pending compactions allowed
+ * @throws InterruptedException
+ */
+ public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
+ try {
+ queueLock.lock();
+ while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) {
+ consumed.await();
+ }
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ /**
+ * Fetch Next pending compaction if available.
+ *
+ * @return
+ * @throws InterruptedException
+ */
+ private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
+ LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
+ HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS);
+ if (instant != null) {
+ try {
+ queueLock.lock();
+ // Signal waiting thread
+ consumed.signal();
+ } finally {
+ queueLock.unlock();
+ }
+ }
+ return instant;
+ }
+
+ /**
+ * Start Compaction Service.
+ */
+ @Override
+ protected Pair<CompletableFuture, ExecutorService> startService() {
+ ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
+ r -> {
+ Thread t = new Thread(r, "async_compact_thread");
+ t.setDaemon(isRunInDaemonMode());
+ return t;
+ });
+ return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
+ try {
+ // Set Compactor Pool Name for allowing users to prioritize compaction
+ LOG.info("Setting Spark Pool name for compaction to " + COMPACT_POOL_NAME);
+ jssc.setLocalProperty("spark.scheduler.pool", COMPACT_POOL_NAME);
+
+ while (!isShutdownRequested()) {
+ final HoodieInstant instant = fetchNextCompactionInstant();
+
+ if (null != instant) {
+ LOG.info("Starting Compaction for instant " + instant);
+ compactor.compact(instant);
+ LOG.info("Finished Compaction for instant " + instant);
+ }
+ }
+ LOG.info("Compactor shutting down properly!!");
+ } catch (InterruptedException ie) {
+ LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
+ } catch (IOException e) {
+ LOG.error("Compactor executor failed", e);
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ return true;
+ }, executor)).toArray(CompletableFuture[]::new)), executor);
+ }
+
+
+ /**
+ * Check whether compactor thread needs to be stopped.
+ * @return
+ */
+ protected boolean shouldStopCompactor() {
+ return false;
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java b/hudi-client/src/main/java/org/apache/hudi/client/Compactor.java
similarity index 94%
rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
rename to hudi-client/src/main/java/org/apache/hudi/client/Compactor.java
index 4c23537..1f11620 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/Compactor.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.deltastreamer;
+package org.apache.hudi.client;
-import org.apache.hudi.client.HoodieWriteClient;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 1ead5ff..6b6074b 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -599,6 +599,26 @@ public class HoodieTestDataGenerator {
return updates;
}
+ /**
+ * Generate update for each record in the dataset.
+ * @param instantTime
+ * @return
+ * @throws IOException
+ */
+ public List<HoodieRecord> generateUpdatesForAllRecords(String instantTime) {
+ List<HoodieRecord> updates = new ArrayList<>();
+ Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+ existingKeys.values().forEach(kp -> {
+ try {
+ HoodieRecord record = generateUpdateRecord(kp.key, instantTime);
+ updates.add(record);
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ });
+ return updates;
+ }
+
public List<HoodieRecord> generateUpdatesAsPerSchema(String commitTime, Integer n, String schemaStr) {
return generateUniqueUpdatesStream(commitTime, n, schemaStr).collect(Collectors.toList());
}
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
index 170ca60..95e4c01 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
@@ -72,8 +72,8 @@ public class HoodieTestHiveBase extends ITTestBase {
}
// Run Hoodie Java App
- String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" +
- " --commit-type %s --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
+ String cmd = String.format("%s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" +
+ " --commit-type %s --table-name %s", HOODIE_GENERATE_APP, hdfsUrl, HIVE_SERVER_JDBC_URL,
tableType, hiveTableName, commitType, hoodieTableName);
if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
cmd = cmd + " --use-multi-partition-keys";
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index a5e6ed9..3ffa0cf 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -60,6 +60,8 @@ public abstract class ITTestBase {
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
+ protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh";
+ protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh";
protected static final String HUDI_HADOOP_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
protected static final String HUDI_HIVE_SYNC_BUNDLE =
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index 4b586a3..c7787a7 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -23,11 +23,12 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-
/**
* Smoke tests to run as part of verification.
*/
@@ -37,27 +38,31 @@ public class ITTestHoodieSanity extends ITTestBase {
SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
}
- @Test
+ @ParameterizedTest
+ @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
/**
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
- public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
+ public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
- testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED);
+ testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
+ PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}
- @Test
+ @ParameterizedTest
+ @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
/**
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with multiple partition-keys
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
* in hive console.
*/
- public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
+ public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
- testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.MULTI_KEYS_PARTITIONED);
+ testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
+ PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}
@@ -73,27 +78,31 @@ public class ITTestHoodieSanity extends ITTestBase {
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}
- @Test
+ @ParameterizedTest
+ @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
/**
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
- public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception {
+ public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_mor_test";
- testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.SINGLE_KEY_PARTITIONED);
+ testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
+ PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}
- @Test
+ @ParameterizedTest
+ @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
/**
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with multiple partition-keys
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
* in hive console.
*/
- public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable() throws Exception {
+ public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test";
- testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.MULTI_KEYS_PARTITIONED);
+ testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
+ PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}
@@ -114,7 +123,7 @@ public class ITTestHoodieSanity extends ITTestBase {
* Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
* spark-shell test-case
*/
- public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType)
+ public void testRunHoodieJavaApp(String command, String hiveTableName, String tableType, PartitionType partitionType)
throws Exception {
String hdfsPath = "/" + hiveTableName;
@@ -137,13 +146,13 @@ public class ITTestHoodieSanity extends ITTestBase {
// Run Hoodie Java App
String cmd;
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
- cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName;
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
- cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
} else {
- cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
}
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
@@ -182,6 +191,11 @@ public class ITTestHoodieSanity extends ITTestBase {
"Expecting 280 rows to be present in the new table");
}
+ public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType)
+ throws Exception {
+ testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, tableType, partitionType);
+ }
+
private void dropHiveTables(String hiveTableName, String tableType) throws Exception {
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh
index e2acc6c..7c63e74 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_app.sh
@@ -37,4 +37,4 @@ fi
OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@"
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_generate_app.sh
similarity index 98%
copy from hudi-spark/run_hoodie_app.sh
copy to hudi-spark/run_hoodie_generate_app.sh
index e2acc6c..a4b4090 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_generate_app.sh
@@ -37,4 +37,4 @@ fi
OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaGenerateApp "$@"
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_streaming_app.sh
similarity index 95%
copy from hudi-spark/run_hoodie_app.sh
copy to hudi-spark/run_hoodie_streaming_app.sh
index e2acc6c..01f1a4e 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_streaming_app.sh
@@ -36,5 +36,5 @@ fi
OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
-echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
+echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaStreamingApp $@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaStreamingApp "$@"
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 3345204..5647e65 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -226,11 +226,16 @@ public class DataSourceUtils {
}
public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
- String tblName, Map<String, String> parameters) {
-
+ String tblName, Map<String, String> parameters) {
+ boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY()));
// inline compaction is on by default for MOR
- boolean inlineCompact = parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY())
+ boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY())
.equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+ return createHoodieClient(jssc, schemaStr, basePath, tblName, parameters, inlineCompact);
+ }
+
+ public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
+ String tblName, Map<String, String> parameters, boolean inlineCompact) {
// insert/bulk-insert combining to be true, if filtering for duplicates
boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY()));
diff --git a/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java b/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
new file mode 100644
index 0000000..ae0ad73
--- /dev/null
+++ b/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Async Compaction Service used by Structured Streaming. Here, async compaction is run in daemon mode to prevent
+ * blocking shutting down the Spark application.
+ */
+public class SparkStreamingAsyncCompactService extends AsyncCompactService {
+
+ private static final long serialVersionUID = 1L;
+
+ public SparkStreamingAsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
+ super(jssc, client, true);
+ }
+}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 8a8f87f..c505ec4 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -281,4 +281,8 @@ object DataSourceWriteOptions {
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
+
+ // Async Compaction - Enabled by default for MOR
+ val ASYNC_COMPACT_ENABLE_KEY = "hoodie.datasource.compaction.async.enable"
+ val DEFAULT_ASYNC_COMPACT_ENABLE_VAL = "true"
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index fbdd4ea..e26c1c8 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -18,6 +18,8 @@
package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions._
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
@@ -103,10 +105,8 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
-
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
-
new HudiEmptyRelation(sqlContext, df.schema)
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 05ef863..479005b 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,6 +21,7 @@ import java.util
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
@@ -29,7 +30,7 @@ import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
@@ -49,7 +50,13 @@ private[hudi] object HoodieSparkSqlWriter {
def write(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
- df: DataFrame): (Boolean, common.util.Option[String]) = {
+ df: DataFrame,
+ hoodieTableConfig: Option[HoodieTableConfig] = Option.empty,
+ hoodieWriteClient: Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
+ asyncCompactionTriggerFn: Option[Function1[HoodieWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
+ )
+ : (Boolean, common.util.Option[String], common.util.Option[String],
+ HoodieWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
val sparkContext = sqlContext.sparkContext
val path = parameters.get("path")
@@ -84,113 +91,134 @@ private[hudi] object HoodieSparkSqlWriter {
val instantTime = HoodieActiveTimeline.createNewInstantTime()
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
-
- if (exists && mode == SaveMode.Append) {
- val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName
- if (!existingTableName.equals(tblName)) {
- throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
- }
+ var tableConfig : HoodieTableConfig = if (exists) {
+ hoodieTableConfig.getOrElse(
+ new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig)
+ } else {
+ null
}
- val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
- if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
- // register classes & schemas
- val structName = s"${tblName}_record"
- val nameSpace = s"hoodie.${tblName}"
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
- val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
- sparkContext.getConf.registerAvroSchemas(schema)
- log.info(s"Registered avro schema : ${schema.toString(true)}")
-
- // Convert to RDD[HoodieRecord]
- val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
- val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
- val hoodieAllIncomingRecords = genericRecords.map(gr => {
- val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
- .asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(gr,
- orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
- }).toJavaRDD()
-
- // Handle various save modes
- if (mode == SaveMode.ErrorIfExists && exists) {
- throw new HoodieException(s"hoodie table at $basePath already exists.")
- }
- if (mode == SaveMode.Ignore && exists) {
- log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
- (true, common.util.Option.empty())
- }
- if (mode == SaveMode.Overwrite && exists) {
- log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
- fs.delete(basePath, true)
- exists = false
- }
-
- // Create the table if not present
- if (!exists) {
- //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
- HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path.get, HoodieTableType.valueOf(tableType),
- tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
- }
-
- // Create a HoodieWriteClient & issue the write.
- val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName,
- mapAsJavaMap(parameters)
- )
-
- val hoodieRecords =
- if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
- DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
- } else {
- hoodieAllIncomingRecords
- }
-
- if (hoodieRecords.isEmpty()) {
- log.info("new batch has no new records, skipping...")
- (true, common.util.Option.empty())
- }
- client.startCommitWithTime(instantTime)
- val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
- (writeStatuses, client)
+ if (mode == SaveMode.Ignore && exists) {
+ log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+ (false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
-
- // Handle save modes
- if (mode != SaveMode.Append) {
- throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
+ if (exists && mode == SaveMode.Append) {
+ val existingTableName = tableConfig.getTableName
+ if (!existingTableName.equals(tblName)) {
+ throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
+ }
}
+ val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
+ if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+ // register classes & schemas
+ val structName = s"${tblName}_record"
+ val nameSpace = s"hoodie.${tblName}"
+ sparkContext.getConf.registerKryoClasses(
+ Array(classOf[org.apache.avro.generic.GenericData],
+ classOf[org.apache.avro.Schema]))
+ val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+ sparkContext.getConf.registerAvroSchemas(schema)
+ log.info(s"Registered avro schema : ${schema.toString(true)}")
+
+ // Convert to RDD[HoodieRecord]
+ val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
+ val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+ val hoodieAllIncomingRecords = genericRecords.map(gr => {
+ val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
+ .asInstanceOf[Comparable[_]]
+ DataSourceUtils.createHoodieRecord(gr,
+ orderingVal, keyGenerator.getKey(gr),
+ parameters(PAYLOAD_CLASS_OPT_KEY))
+ }).toJavaRDD()
+
+ // Handle various save modes
+ if (mode == SaveMode.ErrorIfExists && exists) {
+ throw new HoodieException(s"hoodie table at $basePath already exists.")
+ }
+
+ if (mode == SaveMode.Overwrite && exists) {
+ log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+ fs.delete(basePath, true)
+ exists = false
+ }
+
+ // Create the table if not present
+ if (!exists) {
+ //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
+ val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+ path.get, HoodieTableType.valueOf(tableType),
+ tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+ tableConfig = tableMetaClient.getTableConfig
+ }
+
+ // Create a HoodieWriteClient & issue the write.
+ val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
+ tblName, mapAsJavaMap(parameters)
+ )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+
+ if (asyncCompactionTriggerFn.isDefined &&
+ isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
+ asyncCompactionTriggerFn.get.apply(client)
+ }
+
+ val hoodieRecords =
+ if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+ DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
+ } else {
+ hoodieAllIncomingRecords
+ }
- val structName = s"${tblName}_record"
- val nameSpace = s"hoodie.${tblName}"
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
-
- // Convert to RDD[HoodieKey]
- val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
- val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
- val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
-
- if (!exists) {
- throw new HoodieException(s"hoodie table at $basePath does not exist")
- }
+ if (hoodieRecords.isEmpty()) {
+ log.info("new batch has no new records, skipping...")
+ (true, common.util.Option.empty())
+ }
+ client.startCommitWithTime(instantTime)
+ val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
+ (writeStatuses, client)
+ } else {
- // Create a HoodieWriteClient & issue the delete.
- val client = DataSourceUtils.createHoodieClient(jsc,
- Schema.create(Schema.Type.NULL).toString, path.get, tblName,
- mapAsJavaMap(parameters)
- )
+ // Handle save modes
+ if (mode != SaveMode.Append) {
+ throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
+ }
+
+ val structName = s"${tblName}_record"
+ val nameSpace = s"hoodie.${tblName}"
+ sparkContext.getConf.registerKryoClasses(
+ Array(classOf[org.apache.avro.generic.GenericData],
+ classOf[org.apache.avro.Schema]))
+
+ // Convert to RDD[HoodieKey]
+ val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
+ val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+ val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
+
+ if (!exists) {
+ throw new HoodieException(s"hoodie table at $basePath does not exist")
+ }
+
+ // Create a HoodieWriteClient & issue the delete.
+ val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
+ Schema.create(Schema.Type.NULL).toString, path.get, tblName,
+ mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+
+ if (asyncCompactionTriggerFn.isDefined &&
+ isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
+ asyncCompactionTriggerFn.get.apply(client)
+ }
+
+ // Issue deletes
+ client.startCommitWithTime(instantTime)
+ val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
+ (writeStatuses, client)
+ }
- // Issue deletes
- client.startCommitWithTime(instantTime)
- val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
- (writeStatuses, client)
+ // Check for errors and commit the write.
+ val (writeSuccessful, compactionInstant) =
+ commitAndPerformPostOperations(writeStatuses, parameters, writeClient, tableConfig, instantTime, basePath,
+ operation, jsc)
+ (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
}
-
- // Check for errors and commit the write.
- val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, instantTime, basePath, operation, jsc)
- (writeSuccessful, common.util.Option.ofNullable(instantTime))
}
/**
@@ -222,7 +250,8 @@ private[hudi] object HoodieSparkSqlWriter {
HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL,
- HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL
+ HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL,
+ ASYNC_COMPACT_ENABLE_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_VAL
) ++ translateStorageTypeToTableType(parameters)
}
@@ -258,13 +287,14 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig
}
- private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus],
- parameters: Map[String, String],
- client: HoodieWriteClient[_],
- instantTime: String,
- basePath: Path,
- operation: String,
- jsc: JavaSparkContext): Boolean = {
+ private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus],
+ parameters: Map[String, String],
+ client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
+ tableConfig: HoodieTableConfig,
+ instantTime: String,
+ basePath: Path,
+ operation: String,
+ jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = {
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
@@ -284,6 +314,15 @@ private[hudi] object HoodieSparkSqlWriter {
log.info("Commit " + instantTime + " failed!")
}
+ val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
+ val compactionInstant : common.util.Option[java.lang.String] =
+ if (asyncCompactionEnabled) {
+ client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+ } else {
+ common.util.Option.empty()
+ }
+
+ log.info(s"Compaction Scheduled is $compactionInstant")
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
@@ -292,8 +331,12 @@ private[hudi] object HoodieSparkSqlWriter {
} else {
true
}
- client.close()
- commitSuccess && syncHiveSucess
+
+ log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
+ if (!asyncCompactionEnabled) {
+ client.close()
+ }
+ (commitSuccess && syncHiveSucess, compactionInstant)
} else {
log.error(s"$operation failed with $errorCount errors :")
if (log.isTraceEnabled) {
@@ -308,6 +351,18 @@ private[hudi] object HoodieSparkSqlWriter {
}
})
}
+ (false, common.util.Option.empty())
+ }
+ }
+
+ private def isAsyncCompactionEnabled(client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
+ tableConfig: HoodieTableConfig,
+ parameters: Map[String, String], configuration: Configuration) : Boolean = {
+ log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}")
+ if (!client.getConfig.isInlineCompaction
+ && parameters.get(ASYNC_COMPACT_ENABLE_KEY).exists(r => r.toBoolean)) {
+ tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
+ } else {
false
}
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 9f18a2e..1600ab0 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -16,13 +16,25 @@
*/
package org.apache.hudi
+import java.lang
+import java.util.function.{Function, Supplier}
+
+import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService}
+import org.apache.hudi.client.HoodieWriteClient
+import org.apache.hudi.common.model.HoodieRecordPayload
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.CompactionUtils
import org.apache.hudi.exception.HoodieCorruptedDataException
import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import scala.util.{Failure, Success, Try}
+import scala.collection.JavaConversions._
class HoodieStreamingSink(sqlContext: SQLContext,
options: Map[String, String],
@@ -38,6 +50,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean
+ private var isAsyncCompactorServiceShutdownAbnormally = false
+
private val mode =
if (outputMode == OutputMode.Append()) {
SaveMode.Append
@@ -45,39 +59,54 @@ class HoodieStreamingSink(sqlContext: SQLContext,
SaveMode.Overwrite
}
- override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ private var asyncCompactorService : AsyncCompactService = _
+ private var writeClient : Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
+ private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
+
+ override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
+ if (isAsyncCompactorServiceShutdownAbnormally) {
+ throw new IllegalStateException("Async Compactor shutdown unexpectedly")
+ }
+
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
- sqlContext,
- mode,
- options,
- data)
+ sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor))
) match {
- case Success((true, commitOps)) =>
+ case Success((true, commitOps, compactionInstantOps, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
- Success((true, commitOps))
+ writeClient = Some(client)
+ hoodieTableConfig = Some(tableConfig)
+ if (compactionInstantOps.isPresent) {
+ asyncCompactorService.enqueuePendingCompaction(
+ new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
+ }
+ Success((true, commitOps, compactionInstantOps))
case Failure(e) =>
// clean up persist rdds in the write process
data.sparkSession.sparkContext.getPersistentRDDs
.foreach {
case (id, rdd) =>
- rdd.unpersist()
+ try {
+ rdd.unpersist()
+ } catch {
+ case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
+ }
}
- log.error(s"Micro batch id=$batchId threw following expection: ", e)
+ log.error(s"Micro batch id=$batchId threw following exception: ", e)
if (ignoreFailedBatch) {
log.info(s"Ignore the exception and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
- Success((true, None))
+ Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(e)
}
- case Success((false, commitOps)) =>
+ case Success((false, commitOps, compactionInstantOps, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
@@ -86,7 +115,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
if (ignoreFailedBatch) {
log.info(s"Ignore the errors and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
- Success((true, None))
+ Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
@@ -100,6 +129,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
// spark sometimes hangs upon exceptions and keep on hold of the executors
// this is to force exit upon errors / exceptions and release all executors
// will require redeployment / supervise mode to restart the streaming
+ reset(true)
System.exit(1)
}
case Success(_) =>
@@ -112,11 +142,55 @@ class HoodieStreamingSink(sqlContext: SQLContext,
@annotation.tailrec
private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = {
fn match {
- case x: util.Success[T] => x
+ case x: Success[T] =>
+ x
case _ if n > 1 =>
Thread.sleep(waitInMillis)
retry(n - 1, waitInMillis * 2)(fn)
- case f => f
+ case f =>
+ reset(false)
+ f
+ }
+ }
+
+ protected def triggerAsyncCompactor(client: HoodieWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+ if (null == asyncCompactorService) {
+ log.info("Triggering Async compaction !!")
+ asyncCompactorService = new SparkStreamingAsyncCompactService(new JavaSparkContext(sqlContext.sparkContext),
+ client)
+ asyncCompactorService.start(new Function[java.lang.Boolean, java.lang.Boolean] {
+ override def apply(errored: lang.Boolean): lang.Boolean = {
+ log.info(s"Async Compactor shutdown. Errored ? $errored")
+ isAsyncCompactorServiceShutdownAbnormally = errored
+ reset(false)
+ log.info("Done resetting write client.")
+ true
+ }
+ })
+
+ // Add Shutdown Hook
+ Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+ override def run(): Unit = reset(true)
+ }))
+
+ // First time, scan .hoodie folder and get all pending compactions
+ val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration,
+ client.getConfig.getBasePath)
+ val pendingInstants :java.util.List[HoodieInstant] =
+ CompactionUtils.getPendingCompactionInstantTimes(metaClient)
+ pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h))
+ }
+ }
+
+ private def reset(force: Boolean) : Unit = this.synchronized {
+ if (asyncCompactorService != null) {
+ asyncCompactorService.shutdown(force)
+ asyncCompactorService = null
+ }
+
+ if (writeClient.isDefined) {
+ writeClient.get.close()
+ writeClient = Option.empty
}
}
}
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java
index 2cf36f9..6eda051 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -151,6 +151,7 @@ public class HoodieJavaApp {
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
: SimpleKeyGenerator.class.getCanonicalName())
+ .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false")
// This will remove any existing data at path below, and create a
.mode(SaveMode.Overwrite);
@@ -177,6 +178,7 @@ public class HoodieJavaApp {
nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
+ .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false")
.option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append);
updateHiveSyncConfig(writer);
@@ -202,6 +204,7 @@ public class HoodieJavaApp {
nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
+ .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false")
.option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append);
updateHiveSyncConfig(writer);
diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 977ae09..500189d 100644
--- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -16,12 +16,18 @@
* limitations under the License.
*/
+import java.util.stream.Collectors;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import com.beust.jcommander.JCommander;
@@ -43,6 +49,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.spark.sql.streaming.StreamingQuery;
import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
@@ -52,14 +59,14 @@ import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrin
public class HoodieJavaStreamingApp {
@Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table")
- private String tablePath = "file:///tmp/hoodie/streaming/sample-table";
+ private String tablePath = "/tmp/hoodie/streaming/sample-table";
@Parameter(names = {"--streaming-source-path", "-ssp"}, description = "path for streaming source file folder")
- private String streamingSourcePath = "file:///tmp/hoodie/streaming/source";
+ private String streamingSourcePath = "/tmp/hoodie/streaming/source";
@Parameter(names = {"--streaming-checkpointing-path", "-scp"},
description = "path for streaming checking pointing folder")
- private String streamingCheckpointingPath = "file:///tmp/hoodie/streaming/checkpoint";
+ private String streamingCheckpointingPath = "/tmp/hoodie/streaming/checkpoint";
@Parameter(names = {"--streaming-duration-in-ms", "-sdm"},
description = "time in millisecond for the streaming duration")
@@ -106,7 +113,15 @@ public class HoodieJavaStreamingApp {
cmd.usage();
System.exit(1);
}
- cli.run();
+ int errStatus = 0;
+ try {
+ cli.run();
+ } catch (Exception ex) {
+ LOG.error("Got error running app ", ex);
+ errStatus = -1;
+ } finally {
+ System.exit(errStatus);
+ }
}
/**
@@ -132,38 +147,118 @@ public class HoodieJavaStreamingApp {
List<String> records1 = recordsToStrings(dataGen.generateInserts("001", 100));
Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
- List<String> records2 = recordsToStrings(dataGen.generateUpdates("002", 100));
-
+ List<String> records2 = recordsToStrings(dataGen.generateUpdatesForAllRecords("002"));
Dataset<Row> inputDF2 = spark.read().json(jssc.parallelize(records2, 2));
- // setup the input for streaming
- Dataset<Row> streamingInput = spark.readStream().schema(inputDF1.schema()).json(streamingSourcePath);
+ String ckptPath = streamingCheckpointingPath + "/stream1";
+ String srcPath = streamingSourcePath + "/stream1";
+ fs.mkdirs(new Path(ckptPath));
+ fs.mkdirs(new Path(srcPath));
+
+ // setup the input for streaming
+ Dataset<Row> streamingInput = spark.readStream().schema(inputDF1.schema()).json(srcPath + "/*");
// start streaming and showing
ExecutorService executor = Executors.newFixedThreadPool(2);
+ int numInitialCommits = 0;
// thread for spark strucutured streaming
- Future<Void> streamFuture = executor.submit(() -> {
- LOG.info("===== Streaming Starting =====");
- stream(streamingInput);
- LOG.info("===== Streaming Ends =====");
- return null;
- });
-
- // thread for adding data to the streaming source and showing results over time
- Future<Void> showFuture = executor.submit(() -> {
- LOG.info("===== Showing Starting =====");
- show(spark, fs, inputDF1, inputDF2);
- LOG.info("===== Showing Ends =====");
- return null;
- });
-
- // let the threads run
- streamFuture.get();
- showFuture.get();
-
- executor.shutdown();
+ try {
+ Future<Void> streamFuture = executor.submit(() -> {
+ LOG.info("===== Streaming Starting =====");
+ stream(streamingInput, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL(), ckptPath);
+ LOG.info("===== Streaming Ends =====");
+ return null;
+ });
+
+ // thread for adding data to the streaming source and showing results over time
+ Future<Integer> showFuture = executor.submit(() -> {
+ LOG.info("===== Showing Starting =====");
+ int numCommits = addInputAndValidateIngestion(spark, fs, srcPath,0, 100, inputDF1, inputDF2, true);
+ LOG.info("===== Showing Ends =====");
+ return numCommits;
+ });
+
+ // let the threads run
+ streamFuture.get();
+ numInitialCommits = showFuture.get();
+ } finally {
+ executor.shutdownNow();
+ }
+
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), tablePath);
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ // Ensure we have successfully completed one compaction commit
+ ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() == 1);
+ } else {
+ ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() >= 1);
+ }
+
+ // Deletes Stream
+ // Need to restart application to ensure spark does not assume there are multiple streams active.
+ spark.close();
+ SparkSession newSpark = SparkSession.builder().appName("Hoodie Spark Streaming APP")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
+ jssc = new JavaSparkContext(newSpark.sparkContext());
+ String ckptPath2 = streamingCheckpointingPath + "/stream2";
+ String srcPath2 = srcPath + "/stream2";
+ fs.mkdirs(new Path(ckptPath2));
+ fs.mkdirs(new Path(srcPath2));
+ Dataset<Row> delStreamingInput = newSpark.readStream().schema(inputDF1.schema()).json(srcPath2 + "/*");
+ List<String> deletes = recordsToStrings(dataGen.generateUniqueUpdates("002", 20));
+ Dataset<Row> inputDF3 = newSpark.read().json(jssc.parallelize(deletes, 2));
+ executor = Executors.newFixedThreadPool(2);
+
+ // thread for spark strucutured streaming
+ try {
+ Future<Void> streamFuture = executor.submit(() -> {
+ LOG.info("===== Streaming Starting =====");
+ stream(delStreamingInput, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL(), ckptPath2);
+ LOG.info("===== Streaming Ends =====");
+ return null;
+ });
+
+ final int numCommits = numInitialCommits;
+ // thread for adding data to the streaming source and showing results over time
+ Future<Void> showFuture = executor.submit(() -> {
+ LOG.info("===== Showing Starting =====");
+ addInputAndValidateIngestion(newSpark, fs, srcPath2, numCommits, 80, inputDF3, null, false);
+ LOG.info("===== Showing Ends =====");
+ return null;
+ });
+
+ // let the threads run
+ streamFuture.get();
+ showFuture.get();
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ private void waitTillNCommits(FileSystem fs, int numCommits, int timeoutSecs, int sleepSecsAfterEachRun)
+ throws InterruptedException {
+ long beginTime = System.currentTimeMillis();
+ long currTime = beginTime;
+ long timeoutMsecs = timeoutSecs * 1000;
+
+ while ((currTime - beginTime) < timeoutMsecs) {
+ try {
+ HoodieTimeline timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath);
+ LOG.info("Timeline :" + timeline.getInstants().collect(Collectors.toList()));
+ if (timeline.countInstants() >= numCommits) {
+ return;
+ }
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath, true);
+ System.out.println("Instants :" + metaClient.getActiveTimeline().getInstants().collect(Collectors.toList()));
+ } catch (TableNotFoundException te) {
+ LOG.info("Got table not found exception. Retrying");
+ } finally {
+ Thread.sleep(sleepSecsAfterEachRun * 1000);
+ currTime = System.currentTimeMillis();
+ }
+ }
+ throw new IllegalStateException("Timedout waiting for " + numCommits + " commits to appear in " + tablePath);
}
/**
@@ -175,23 +270,40 @@ public class HoodieJavaStreamingApp {
* @param inputDF2
* @throws Exception
*/
- public void show(SparkSession spark, FileSystem fs, Dataset<Row> inputDF1, Dataset<Row> inputDF2) throws Exception {
- inputDF1.write().mode(SaveMode.Append).json(streamingSourcePath);
+ public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath,
+ int initialCommits, int expRecords,
+ Dataset<Row> inputDF1, Dataset<Row> inputDF2, boolean instantTimeValidation) throws Exception {
+ inputDF1.write().mode(SaveMode.Append).json(srcPath);
+
+ int numExpCommits = initialCommits + 1;
// wait for spark streaming to process one microbatch
- Thread.sleep(3000);
+ waitTillNCommits(fs, numExpCommits, 180, 3);
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
LOG.info("First commit at instant time :" + commitInstantTime1);
- inputDF2.write().mode(SaveMode.Append).json(streamingSourcePath);
- // wait for spark streaming to process one microbatch
- Thread.sleep(3000);
- String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
- LOG.info("Second commit at instant time :" + commitInstantTime2);
+ String commitInstantTime2 = commitInstantTime1;
+ if (null != inputDF2) {
+ numExpCommits += 1;
+ inputDF2.write().mode(SaveMode.Append).json(srcPath);
+ // wait for spark streaming to process one microbatch
+ Thread.sleep(3000);
+ waitTillNCommits(fs, numExpCommits, 180, 3);
+ commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+ LOG.info("Second commit at instant time :" + commitInstantTime2);
+ }
+
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ numExpCommits += 1;
+ // Wait for compaction to also finish and track latest timestamp as commit timestamp
+ waitTillNCommits(fs, numExpCommits, 180, 3);
+ commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+ LOG.info("Compaction commit at instant time :" + commitInstantTime2);
+ }
/**
* Read & do some queries
*/
- Dataset<Row> hoodieROViewDF = spark.read().format("org.apache.hudi")
+ Dataset<Row> hoodieROViewDF = spark.read().format("hudi")
// pass any path glob, can include hoodie & non-hoodie
// datasets
.load(tablePath + "/*/*/*/*");
@@ -200,11 +312,24 @@ public class HoodieJavaStreamingApp {
// all trips whose fare amount was greater than 2.
spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show();
+ if (instantTimeValidation) {
+ System.out.println("Showing all records. Latest Instant Time =" + commitInstantTime2);
+ spark.sql("select * from hoodie_ro").show(200, false);
+ long numRecordsAtInstant2 =
+ spark.sql("select * from hoodie_ro where _hoodie_commit_time = " + commitInstantTime2).count();
+ ValidationUtils.checkArgument(numRecordsAtInstant2 == expRecords,
+ "Expecting " + expRecords + " records, Got " + numRecordsAtInstant2);
+ }
+
+ long numRecords = spark.sql("select * from hoodie_ro").count();
+ ValidationUtils.checkArgument(numRecords == expRecords,
+ "Expecting " + expRecords + " records, Got " + numRecords);
+
if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
/**
* Consume incrementally, only changes in commit 2 above. Currently only supported for COPY_ON_WRITE TABLE
*/
- Dataset<Row> hoodieIncViewDF = spark.read().format("org.apache.hudi")
+ Dataset<Row> hoodieIncViewDF = spark.read().format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
// Only changes in write 2 above
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1)
@@ -214,6 +339,7 @@ public class HoodieJavaStreamingApp {
LOG.info("You will only see records from : " + commitInstantTime2);
hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
}
+ return numExpCommits;
}
/**
@@ -222,19 +348,23 @@ public class HoodieJavaStreamingApp {
* @param streamingInput
* @throws Exception
*/
- public void stream(Dataset<Row> streamingInput) throws Exception {
+ public void stream(Dataset<Row> streamingInput, String operationType, String checkpointLocation) throws Exception {
DataStreamWriter<Row> writer = streamingInput.writeStream().format("org.apache.hudi")
.option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), operationType)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
- .option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", streamingCheckpointingPath)
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
+ .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "true")
+ .option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());
updateHiveSyncConfig(writer);
- writer.trigger(new ProcessingTime(500)).start(tablePath).awaitTermination(streamingDurationInMs);
+ StreamingQuery query = writer.trigger(new ProcessingTime(500)).start(tablePath);
+ query.awaitTermination(streamingDurationInMs);
}
/**
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 7f26481..feeec96 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -50,7 +50,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
try {
val sqlContext = session.sqlContext
val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
- val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame))
+ val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options,
+ session.emptyDataFrame))
assert(e.getMessage.contains("spark.serializer"))
} finally {
session.stop()
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
index 07e8704..48582c1 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
@@ -17,12 +17,16 @@
package org.apache.hudi.functional
+
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
@@ -39,6 +43,7 @@ import scala.concurrent.{Await, Future}
* Basic tests on the spark datasource
*/
class TestDataSource {
+ private val log = LogManager.getLogger(getClass)
var spark: SparkSession = null
var dataGen: HoodieTestDataGenerator = null
@@ -214,7 +219,7 @@ class TestDataSource {
assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
}
- //@Test (TODO: re-enable after fixing noisyness)
+ @Test
def testStructuredStreaming(): Unit = {
fs.delete(new Path(basePath), true)
val sourcePath = basePath + "/source"
@@ -254,7 +259,7 @@ class TestDataSource {
val f2 = Future {
inputDF1.write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
- Thread.sleep(3000)
+ val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5);
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
// Read RO View
@@ -264,9 +269,8 @@ class TestDataSource {
inputDF2.write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
- Thread.sleep(10000)
+ waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5);
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
-
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// Read RO View
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
@@ -299,8 +303,35 @@ class TestDataSource {
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
}
-
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+ }
+ @throws[InterruptedException]
+ private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
+ numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int): Int = {
+ val beginTime = System.currentTimeMillis
+ var currTime = beginTime
+ val timeoutMsecs = timeoutSecs * 1000
+ var numInstants = 0
+ var success: Boolean = false
+ while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
+ val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath)
+ log.info("Timeline :" + timeline.getInstants.toArray)
+ if (timeline.countInstants >= numCommits) {
+ numInstants = timeline.countInstants
+ success = true
+ }
+ val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
+ } catch {
+ case te: TableNotFoundException =>
+ log.info("Got table not found exception. Retrying")
+ } finally {
+ Thread.sleep(sleepSecsAfterEachRun * 1000)
+ currTime = System.currentTimeMillis
+ }
+ if (!success) {
+ throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
+ }
+ numInstants
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 05a1bbc..3bc51c9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -18,8 +18,9 @@
package org.apache.hudi.utilities.deltastreamer;
-import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.async.AsyncCompactService;
+import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
@@ -62,15 +63,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.IntStream;
/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
@@ -97,6 +92,8 @@ public class HoodieDeltaStreamer implements Serializable {
private final Option<BootstrapExecutor> bootstrapExecutor;
+ public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
+
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
jssc.hadoopConfiguration(), null);
@@ -559,8 +556,8 @@ public class HoodieDeltaStreamer implements Serializable {
boolean error = false;
if (cfg.isAsyncCompactionEnabled()) {
// set Scheduler Pool.
- LOG.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME);
- jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
+ LOG.info("Setting Spark Pool name for delta-sync to " + DELTASYNC_POOL_NAME);
+ jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME);
}
try {
while (!isShutdownRequested()) {
@@ -661,100 +658,6 @@ public class HoodieDeltaStreamer implements Serializable {
}
}
- /**
- * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
- */
- public static class AsyncCompactService extends AbstractAsyncService {
-
- private static final long serialVersionUID = 1L;
- private final int maxConcurrentCompaction;
- private transient Compactor compactor;
- private transient JavaSparkContext jssc;
- private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
- private transient ReentrantLock queueLock = new ReentrantLock();
- private transient Condition consumed = queueLock.newCondition();
-
- public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
- this.jssc = jssc;
- this.compactor = new Compactor(client, jssc);
- this.maxConcurrentCompaction = 1;
- }
-
- /**
- * Enqueues new Pending compaction.
- */
- public void enqueuePendingCompaction(HoodieInstant instant) {
- pendingCompactions.add(instant);
- }
-
- /**
- * Wait till outstanding pending compactions reduces to the passed in value.
- *
- * @param numPendingCompactions Maximum pending compactions allowed
- * @throws InterruptedException
- */
- public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
- try {
- queueLock.lock();
- while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) {
- consumed.await();
- }
- } finally {
- queueLock.unlock();
- }
- }
-
- /**
- * Fetch Next pending compaction if available.
- *
- * @return
- * @throws InterruptedException
- */
- private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
- LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
- HoodieInstant instant = pendingCompactions.poll(60, TimeUnit.SECONDS);
- if (instant != null) {
- try {
- queueLock.lock();
- // Signal waiting thread
- consumed.signal();
- } finally {
- queueLock.unlock();
- }
- }
- return instant;
- }
-
- /**
- * Start Compaction Service.
- */
- @Override
- protected Pair<CompletableFuture, ExecutorService> startService() {
- ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction);
- return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
- try {
- // Set Compactor Pool Name for allowing users to prioritize compaction
- LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
- jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
-
- while (!isShutdownRequested()) {
- final HoodieInstant instant = fetchNextCompactionInstant();
- if (null != instant) {
- compactor.compact(instant);
- }
- }
- LOG.info("Compactor shutting down properly!!");
- } catch (InterruptedException ie) {
- LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
- } catch (IOException e) {
- LOG.error("Compactor executor failed", e);
- throw new HoodieIOException(e.getMessage(), e);
- }
- return true;
- }, executor)).toArray(CompletableFuture[]::new)), executor);
- }
- }
-
public DeltaSyncService getDeltaSyncService() {
return deltaSyncService.get();
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
index 54fcf68..f5f1f38 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.deltastreamer;
+import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
@@ -41,8 +42,8 @@ public class SchedulerConfGenerator {
private static final Logger LOG = LogManager.getLogger(SchedulerConfGenerator.class);
- public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
- public static final String COMPACT_POOL_NAME = "hoodiecompact";
+ public static final String DELTASYNC_POOL_NAME = HoodieDeltaStreamer.DELTASYNC_POOL_NAME;
+ public static final String COMPACT_POOL_NAME = AsyncCompactService.COMPACT_POOL_NAME;
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR";
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";