You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/27 19:50:47 UTC
[incubator-hudi] branch master updated: [HUDI-819] Fix a bug with
MergeOnReadLazyInsertIterable.
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6de9f5d [HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.
6de9f5d is described below
commit 6de9f5d9e5cb1f82d7c32d04b114e7d4a181619b
Author: satishkotha <sa...@uber.com>
AuthorDate: Mon Apr 27 12:50:39 2020 -0700
[HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.
Variable declared here[1] masks protected statuses variable. So although hoodie writes data, will not include writestatus in the completed section. This can cause duplicates being written (#1540)
[1] https://github.com/apache/incubator-hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java#L53
---
.../hudi/execution/BulkInsertMapFunction.java | 2 +-
...InsertIterable.java => LazyInsertIterable.java} | 35 +++++-----
.../execution/MergeOnReadLazyInsertIterable.java | 74 ----------------------
.../org/apache/hudi/io/AppendHandleFactory.java | 36 +++++++++++
.../org/apache/hudi/io/CreateHandleFactory.java | 36 +++++++++++
.../org/apache/hudi/io/WriteHandleFactory.java | 35 ++++++++++
.../table/action/commit/CommitActionExecutor.java | 4 +-
.../deltacommit/DeltaCommitActionExecutor.java | 7 +-
.../execution/TestBoundedInMemoryExecutor.java | 4 +-
.../hudi/execution/TestBoundedInMemoryQueue.java | 4 +-
10 files changed, 138 insertions(+), 99 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
index 5d4391c..67c1d75 100644
--- a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
+++ b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
@@ -50,7 +50,7 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
- return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
+ return new LazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier());
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
similarity index 81%
rename from hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
rename to hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
index 8f98496..fe0d5c4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
@@ -28,7 +28,8 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
@@ -43,26 +44,34 @@ import java.util.function.Function;
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new files.
*/
-public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
+public class LazyInsertIterable<T extends HoodieRecordPayload>
extends LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
protected final HoodieWriteConfig hoodieConfig;
protected final String instantTime;
protected final HoodieTable<T> hoodieTable;
protected final String idPrefix;
- protected int numFilesWritten;
protected SparkTaskContextSupplier sparkTaskContextSupplier;
+ protected WriteHandleFactory<T> writeHandleFactory;
- public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
- String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
- SparkTaskContextSupplier sparkTaskContextSupplier) {
+ public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
+ String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
+ SparkTaskContextSupplier sparkTaskContextSupplier) {
+ this(sortedRecordItr, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier,
+ new CreateHandleFactory<>());
+ }
+
+ public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
+ String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
+ SparkTaskContextSupplier sparkTaskContextSupplier,
+ WriteHandleFactory<T> writeHandleFactory) {
super(sortedRecordItr);
this.hoodieConfig = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
- this.numFilesWritten = 0;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+ this.writeHandleFactory = writeHandleFactory;
}
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
@@ -118,10 +127,6 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
@Override
protected void end() {}
- protected String getNextFileId(String idPfx) {
- return String.format("%s-%d", idPfx, numFilesWritten++);
- }
-
protected CopyOnWriteInsertHandler getInsertHandler() {
return new CopyOnWriteInsertHandler();
}
@@ -140,8 +145,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
final HoodieRecord insertPayload = payload.record;
// lazily initialize the handle, for the first time
if (handle == null) {
- handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
- getNextFileId(idPrefix), sparkTaskContextSupplier);
+ handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+ idPrefix, sparkTaskContextSupplier);
}
if (handle.canWrite(payload.record)) {
@@ -151,8 +156,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
// handle is full.
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
- handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
- getNextFileId(idPrefix), sparkTaskContextSupplier);
+ handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+ idPrefix, sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
deleted file mode 100644
index 02a9ead..0000000
--- a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.execution;
-
-import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.io.HoodieAppendHandle;
-import org.apache.hudi.table.HoodieTable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new log files.
- */
-public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends CopyOnWriteLazyInsertIterable<T> {
-
- public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
- String instantTime, HoodieTable<T> hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) {
- super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier);
- }
-
- @Override
- protected CopyOnWriteInsertHandler getInsertHandler() {
- return new MergeOnReadInsertHandler();
- }
-
- protected class MergeOnReadInsertHandler extends CopyOnWriteInsertHandler {
-
- @Override
- protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
- final HoodieRecord insertPayload = payload.record;
- List<WriteStatus> statuses = new ArrayList<>();
- // lazily initialize the handle, for the first time
- if (handle == null) {
- handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
- }
- if (handle.canWrite(insertPayload)) {
- // write the payload, if the handle has capacity
- handle.write(insertPayload, payload.insertValue, payload.exception);
- } else {
- // handle is full.
- handle.close();
- statuses.add(handle.getWriteStatus());
- // Need to handle the rejected payload & open new handle
- handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
- handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
- }
- }
- }
-
-}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
new file mode 100644
index 0000000..4a5554b
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public class AppendHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
+
+ @Override
+ public HoodieAppendHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
+ final HoodieTable<T> hoodieTable, final String partitionPath,
+ final String fileIdPrefix, final SparkTaskContextSupplier sparkTaskContextSupplier) {
+
+ return new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
+ getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
new file mode 100644
index 0000000..68d8b4d
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public class CreateHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
+
+ @Override
+ public HoodieWriteHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
+ final HoodieTable<T> hoodieTable, final String partitionPath,
+ final String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) {
+
+ return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
+ getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
new file mode 100644
index 0000000..7039b71
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public abstract class WriteHandleFactory<T extends HoodieRecordPayload> {
+ private int numFilesWritten = 0;
+
+ public abstract HoodieWriteHandle<T> create(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
+ String partitionPath, String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier);
+
+ protected String getNextFileId(String idPfx) {
+ return String.format("%s-%d", idPfx, numFilesWritten++);
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
index b958837..5208c12 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
+import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;
@@ -132,7 +132,7 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
- return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+ return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier);
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
index 775580e..be3806e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
@@ -24,8 +24,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
+import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
@@ -84,8 +85,8 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (table.getIndex().canIndexLogFiles()) {
- return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
- sparkTaskContextSupplier);
+ return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+ sparkTaskContextSupplier, new AppendHandleFactory<>());
} else {
return super.handleInsert(idPfx, recordItr);
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
index 58f898c..1db248f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.avro.generic.IndexedRecord;
import org.junit.After;
@@ -37,7 +37,7 @@ import java.util.List;
import scala.Tuple2;
-import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index 72b3eff..859cf4e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.avro.generic.IndexedRecord;
import org.junit.After;
@@ -53,7 +53,7 @@ import java.util.stream.IntStream;
import scala.Tuple2;
-import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;