You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2019/10/07 06:57:42 UTC
[tez] branch master updated: TEZ-4075: Reimplement
tez.runtime.transfer.data-via-events.enabled (#48) (Contributed by Richard
Zhang)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new de019d5 TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled (#48) (Contributed by Richard Zhang)
de019d5 is described below
commit de019d5e31ba72659dd88a3d8fe5db8b4af0a3cd
Author: rbalamohan <rb...@apache.org>
AuthorDate: Mon Oct 7 12:27:38 2019 +0530
TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled (#48) (Contributed by Richard Zhang)
* TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled
* TEZ-4075: Reimplement tez.runtime.transfer.data-via-events.enabled
---
.../apache/tez/common/counters/TaskCounter.java | 10 +++-
.../library/api/TezRuntimeConfiguration.java | 14 +++++
.../library/common/shuffle/ShuffleUtils.java | 3 +-
.../shuffle/impl/ShuffleInputEventHandlerImpl.java | 49 ++++++++++++++-
.../common/shuffle/impl/ShuffleManager.java | 46 ++++++++++++++
.../writers/BaseUnorderedPartitionedKVWriter.java | 6 ++
.../writers/UnorderedPartitionedKVWriter.java | 63 ++++++++++++++++---
.../runtime/library/output/UnorderedKVOutput.java | 2 +
.../output/UnorderedPartitionedKVOutput.java | 2 +
.../src/main/proto/ShufflePayloads.proto | 2 +
.../writers/TestUnorderedPartitionedKVWriter.java | 70 ++++++++++++++--------
.../TestUnorderedPartitionedKVOutputConfig.java | 8 +++
.../test/java/org/apache/tez/test/TestTezJobs.java | 55 +++++++++++++++++
13 files changed, 293 insertions(+), 37 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 80424c7..e604f37 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -220,5 +220,13 @@ public enum TaskCounter {
*
* Represented in milliseconds
*/
- LAST_EVENT_RECEIVED
+ LAST_EVENT_RECEIVED,
+
+
+ /**
+ * The size of the data that is transmitted via event.
+ *
+ * Represented in number of bytes
+ */
+ DATA_BYTES_VIA_EVENT
}
\ No newline at end of file
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 86792e2..d4532c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -484,6 +484,18 @@ public class TezRuntimeConfiguration {
"empty.partitions.info-via-events.enabled";
public static final boolean TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT = true;
+ @Private
+ public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED =
+ TEZ_RUNTIME_PREFIX + "transfer.data-via-events.enabled";
+ @Private
+ public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT = false;
+
+ @Private
+ public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE =
+ TEZ_RUNTIME_PREFIX + "transfer.data-via-events.max-size";
+ @Private
+ public static final int TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 512;
+
/**
* If the shuffle input is on the local host bypass the http fetch and access the files directly
*/
@@ -619,6 +631,8 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
+ tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
+ tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index df4281a..00f3745 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -263,7 +263,8 @@ public class ShuffleUtils {
sb.append("host: " + dmProto.getHost()).append(", ");
sb.append("port: " + dmProto.getPort()).append(", ");
sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
- sb.append("runDuration: " + dmProto.getRunDuration());
+ sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
+ sb.append("hasDataInEvent: " + dmProto.hasData());
sb.append("]");
return sb.toString();
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 542ec34..7ad1389 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -43,7 +43,11 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -166,6 +170,9 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
private void processDataMovementEvent(DataMovementEvent dme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
int srcIndex = dme.getSourceIndex();
+
+ String hostIdentifier = shufflePayload.getHost() + ":" + shufflePayload.getPort();
+
if (LOG.isDebugEnabled()) {
LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
+ ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
@@ -189,7 +196,47 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(),
shufflePayload, (useSharedInputs && srcIndex == 0));
- shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
+ if (shufflePayload.hasData()) {
+ DataProto dataProto = shufflePayload.getData();
+
+ FetchedInput fetchedInput =
+ inputAllocator.allocate(dataProto.getRawLength(),
+ dataProto.getCompressedLength(), srcAttemptIdentifier);
+ moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
+ shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
+
+ LOG.debug("Payload via DME : " + srcAttemptIdentifier);
+ } else {
+ shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
+ srcAttemptIdentifier, srcIndex);
+ }
+ }
+
+ private void moveDataToFetchedInput(DataProto dataProto,
+ FetchedInput fetchedInput, String hostIdentifier) throws IOException {
+ switch (fetchedInput.getType()) {
+ case DISK:
+ ShuffleUtils
+ .shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
+ hostIdentifier, dataProto.getData().newInput(),
+ dataProto.getCompressedLength(),
+ dataProto.getUncompressedLength(), LOG,
+ fetchedInput.getInputAttemptIdentifier(), ifileReadAhead,
+ ifileReadAheadLength, true);
+ break;
+ case MEMORY:
+ ShuffleUtils
+ .shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
+ dataProto.getData().newInput(), dataProto.getRawLength(),
+ dataProto.getCompressedLength(),
+ codec, ifileReadAhead, ifileReadAheadLength, LOG,
+ fetchedInput.getInputAttemptIdentifier());
+ break;
+ case WAIT:
+ default:
+ throw new TezUncheckedException("Unexpected type: "
+ + fetchedInput.getType());
+ }
}
private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index ba8592f..5e03f08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -665,6 +665,52 @@ public class ShuffleManager implements FetcherCallback {
}
}
+ public void addCompletedInputWithData(
+ InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
+ throws IOException {
+ //InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received Data via Event: " + srcAttemptIdentifier + " to "
+ + fetchedInput.getType());
+ }
+ // Count irrespective of whether this is a copy of an already fetched input
+ lock.lock();
+ try {
+ lastProgressTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+
+ boolean committed = false;
+ if (!completedInputSet.get(inputIdentifier)) {
+ synchronized (completedInputSet) {
+ if (!completedInputSet.get(inputIdentifier)) {
+ fetchedInput.commit();
+ committed = true;
+ if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+ registerCompletedInput(fetchedInput);
+ } else {
+ registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier,
+ fetchedInput);
+ }
+ }
+ }
+ }
+ if (!committed) {
+ fetchedInput.abort(); // If this fails, the fetcher may attempt another
+ // abort.
+ } else {
+ lock.lock();
+ try {
+ // Signal the wakeLoop to check for termination.
+ wakeLoop.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
protected synchronized void updateEventReceivedTime() {
long relativeTime = System.currentTimeMillis() - startTime;
if (firstEventReceived.getValue() == 0) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 30d1adb..3467c82 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -101,6 +101,11 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
*/
protected final TezCounter numAdditionalSpillsCounter;
+ /**
+ * Represents the number of bytes that is transmitted via the event.
+ */
+ protected final TezCounter dataViaEventSize;
+
@SuppressWarnings("unchecked")
public BaseUnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf, int numOutputs) {
this.outputContext = outputContext;
@@ -122,6 +127,7 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
additionalSpillBytesWritternCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
additionalSpillBytesReadCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+ dataViaEventSize = outputContext.getCounters().findCounter(TaskCounter.DATA_BYTES_VIA_EVENT);
// compression
if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 29478dc..7165205 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezCommonUtils;
@@ -70,6 +71,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,6 +130,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
// uncompressed size for each partition
private final long[] sizePerPartition;
private volatile long spilledSize = 0;
+ private boolean dataViaEventsEnabled;
+ private int dataViaEventsMaxSize;
static final ThreadLocal<Deflater> deflater = new ThreadLocal<Deflater>() {
@@ -210,6 +214,15 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
this.pipelinedShuffle = pipelinedShuffleConf && !isFinalMergeEnabled;
this.finalEvents = Lists.newLinkedList();
+ this.dataViaEventsEnabled = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED,
+ TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT);
+
+ // No max cap on size (intentional)
+ this.dataViaEventsMaxSize = conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE,
+ TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT);
+
if (availableMemoryBytes == 0) {
Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory "
+ "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration
@@ -270,14 +283,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
outputLargeRecordsCounter = outputContext.getCounters().findCounter(
TaskCounter.OUTPUT_LARGE_RECORDS);
-
-
indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
if (numPartitions == 1 && !pipelinedShuffle) {
//special case, where in only one partition is available.
finalOutPath = outputFileHandler.getOutputFileForWrite();
- finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
skipBuffers = true;
writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
codec, outputRecordsCounter, outputRecordBytesCounter);
@@ -298,7 +308,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
+ ", pipelinedShuffle=" + pipelinedShuffle
+ ", isFinalMergeEnabled=" + isFinalMergeEnabled
+ ", numPartitions=" + numPartitions
- + ", reportPartitionStats=" + reportPartitionStats);
+ + ", reportPartitionStats=" + reportPartitionStats
+ + ", dataViaEventsEnabled=" + dataViaEventsEnabled
+ + ", dataViaEventsMaxSize=" + dataViaEventsMaxSize
+ );
}
private static final int ALLOC_OVERHEAD = 64;
@@ -684,6 +697,21 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
return reqBytes;
}
+ private boolean canSendDataOverDME() {
+ return (writer != null) && (dataViaEventsEnabled)
+ && (writer.getCompressedLength() <= dataViaEventsMaxSize);
+ }
+
+ private byte[] readDataForDME() throws IOException {
+ // TODO: Not introducing a caching layer in IFile yet.
+ byte[] buf = null;
+ try (FSDataInputStream inStream = rfs.open(finalOutPath)) {
+ buf = new byte[(int) writer.getCompressedLength()];
+ IOUtils.readFully(inStream, buf, 0, (int) writer.getCompressedLength());
+ }
+ return buf;
+ }
+
@Override
public List<Event> close() throws IOException, InterruptedException {
// In case there are buffers to be spilled, schedule spilling
@@ -721,10 +749,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
writer.close();
long rawLen = writer.getRawLength();
long compLen = writer.getCompressedLength();
- TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
- TezSpillRecord sr = new TezSpillRecord(1);
- sr.putIndex(rec, 0);
- sr.writeToFile(finalIndexPath, conf);
BitSet emptyPartitions = new BitSet();
if (outputRecordsCounter.getValue() == 0) {
@@ -742,8 +766,17 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
}
eventList.add(generateVMEvent());
+
+ if (!canSendDataOverDME()) {
+ TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
+ TezSpillRecord sr = new TezSpillRecord(1);
+ sr.putIndex(rec, 0);
+ finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+ sr.writeToFile(finalIndexPath, conf);
+ }
eventList.add(generateDMEvent(false, -1, false, outputContext
- .getUniqueIdentifier(), emptyPartitions));
+ .getUniqueIdentifier(), emptyPartitions));
+
return eventList;
}
@@ -856,6 +889,18 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
payloadBuilder.setLastEvent(isLastSpill);
}
+ if (canSendDataOverDME()) {
+ ShuffleUserPayloads.DataProto.Builder dataProtoBuilder = ShuffleUserPayloads.DataProto.newBuilder();
+ dataProtoBuilder.setData(ByteString.copyFrom(readDataForDME()));
+ dataProtoBuilder.setRawLength((int) this.writer.getRawLength());
+
+ dataProtoBuilder.setCompressedLength((int) this.writer.getCompressedLength());
+ payloadBuilder.setData(dataProtoBuilder.build());
+
+ this.dataViaEventSize.increment(this.writer.getCompressedLength());
+ LOG.debug("payload packed in DME, dataSize: " + this.writer.getCompressedLength());
+ }
+
ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer();
return CompositeDataMovementEvent.create(0, numPartitions, payload);
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index c987024..50e1e8b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -167,6 +167,8 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 94312f7..5c56083 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -140,6 +140,8 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 0a4f4a6..0d13710 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -26,6 +26,7 @@ message DataMovementEventPayloadProto {
optional int32 port = 3;
optional string path_component = 4;
optional int32 run_duration = 5;
+ optional DataProto data = 6;
optional bool pipelined = 7; // Related to pipelined shuffle
optional bool last_event = 8; // Related to pipelined shuffle
optional int32 spill_id = 9; // Related to pipelined shuffle.
@@ -35,6 +36,7 @@ message DataProto {
optional int32 raw_length = 1;
optional int32 compressed_length = 2;
optional bytes data = 3;
+ optional int32 uncompressed_length = 4;
}
message InputInformationEventPayloadProto {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 83bde7b..4fa0311 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
+import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -264,7 +265,7 @@ public class TestUnorderedPartitionedKVWriter {
@Test(timeout = 10000)
public void testMultipleSpillsWithSmallBuffer() throws IOException, InterruptedException {
// numBuffers is much higher than available threads.
- baseTest(200, 10, null, shouldCompress, 512, 0, 9600);
+ baseTest(200, 10, null, shouldCompress, 512, 0, 9600, false);
}
@Test(timeout = 10000)
@@ -280,7 +281,9 @@ public class TestUnorderedPartitionedKVWriter {
@Test(timeout = 10000)
public void testNoRecords_SinglePartition() throws IOException, InterruptedException {
// skipBuffers
- baseTest(0, 1, null, shouldCompress, -1, 0);
+ baseTest(0, 1, null, shouldCompress, -1, 0, 2048, false);
+ // Check with data via events
+ baseTest(0, 1, null, shouldCompress, -1, 0, 2048, true);
}
@Test(timeout = 10000)
@@ -488,6 +491,7 @@ public class TestUnorderedPartitionedKVWriter {
assertEquals(numPartitions, cdme.getCount());
DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(
ByteString.copyFrom(cdme.getUserPayload()));
+ assertFalse(eventProto.hasData());
BitSet emptyPartitionBits = null;
if (partitionsWithData.cardinality() != numPartitions) {
assertTrue(eventProto.hasEmptyPartitions());
@@ -1095,12 +1099,12 @@ public class TestUnorderedPartitionedKVWriter {
boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent)
throws IOException, InterruptedException {
baseTest(numRecords, numPartitions, skippedPartitions, shouldCompress,
- maxSingleBufferSizeBytes, bufferMergePercent, 2048);
+ maxSingleBufferSizeBytes, bufferMergePercent, 2048, false);
}
private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent, int
- availableMemory)
+ availableMemory, boolean dataViaEventEnabled)
throws IOException, InterruptedException {
PartitionerForTest partitioner = new PartitionerForTest();
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
@@ -1189,7 +1193,7 @@ public class TestUnorderedPartitionedKVWriter {
long fileOutputBytes = fileOutputBytesCounter.getValue();
if (numRecordsWritten > 0) {
assertTrue(fileOutputBytes > 0);
- if (!shouldCompress) {
+ if ((!shouldCompress) && (!dataViaEventEnabled)) {
assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue());
}
} else {
@@ -1262,37 +1266,53 @@ public class TestUnorderedPartitionedKVWriter {
return;
}
+ boolean isInMem= eventProto.getData().hasData();
assertTrue(localFs.exists(outputFilePath));
- assertTrue(localFs.exists(spillFilePath));
- assertEquals("Incorrect output permissions", (short)0640,
- localFs.getFileStatus(outputFilePath).getPermission().toShort());
- assertEquals("Incorrect index permissions", (short)0640,
- localFs.getFileStatus(spillFilePath).getPermission().toShort());
-
- // verify no intermediate spill files have been left around
- synchronized (kvWriter.spillInfoList) {
- for (SpillInfo spill : kvWriter.spillInfoList) {
- assertFalse("lingering intermediate spill file " + spill.outPath,
- localFs.exists(spill.outPath));
+ assertEquals("Incorrect output permissions", (short) 0640,
+ localFs.getFileStatus(outputFilePath).getPermission().toShort());
+ if( !isInMem ) {
+ assertTrue(localFs.exists(spillFilePath));
+ assertEquals("Incorrect index permissions", (short) 0640,
+ localFs.getFileStatus(spillFilePath).getPermission().toShort());
+
+ // verify no intermediate spill files have been left around
+ synchronized (kvWriter.spillInfoList) {
+ for (SpillInfo spill : kvWriter.spillInfoList) {
+ assertFalse("lingering intermediate spill file " + spill.outPath,
+ localFs.exists(spill.outPath));
+ }
}
}
// Special case for 0 records.
- TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
DataInputBuffer keyBuffer = new DataInputBuffer();
DataInputBuffer valBuffer = new DataInputBuffer();
IntWritable keyDeser = new IntWritable();
LongWritable valDeser = new LongWritable();
for (int i = 0; i < numOutputs; i++) {
- TezIndexRecord indexRecord = spillRecord.getIndex(i);
- if (skippedPartitions != null && skippedPartitions.contains(i)) {
- assertFalse("The Index Record for partition " + i + " should not have any data", indexRecord.hasData());
- continue;
+ IFile.Reader reader = null;
+ InputStream inStream;
+ if (isInMem) {
+ // Read from in memory payload
+ int dataLoadSize = eventProto.getData().getData().size();
+ inStream = new ByteArrayInputStream(eventProto.getData().getData().toByteArray());
+ reader = new IFile.Reader(inStream, dataLoadSize, codec, null,
+ null, false, 0, -1);
+ } else {
+ TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
+ TezIndexRecord indexRecord = spillRecord.getIndex(i);
+ if (skippedPartitions != null && skippedPartitions.contains(i)) {
+ assertFalse("The Index Record for partition " + i + " should not have any data", indexRecord.hasData());
+ continue;
+ }
+
+ FSDataInputStream tmpStream = FileSystem.getLocal(conf).open(outputFilePath);
+ tmpStream.seek(indexRecord.getStartOffset());
+ inStream = tmpStream;
+ reader = new IFile.Reader(tmpStream, indexRecord.getPartLength(), codec, null,
+ null, false, 0, -1);
}
- FSDataInputStream inStream = FileSystem.getLocal(conf).open(outputFilePath);
- inStream.seek(indexRecord.getStartOffset());
- IFile.Reader reader = new IFile.Reader(inStream, indexRecord.getPartLength(), codec, null,
- null, false, 0, -1);
+
while (reader.nextRawKey(keyBuffer)) {
reader.nextRawValue(valBuffer);
keyDeser.readFields(keyBuffer);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
index 5e49b51..4bcff88 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
@@ -83,6 +83,10 @@ public class TestUnorderedPartitionedKVOutputConfig {
.setAdditionalConfiguration("fs.shouldExist", "fs")
.setAdditionalConfiguration("test.key.1", "key1")
.setAdditionalConfiguration(TezRuntimeConfiguration
+ .TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, "true")
+ .setAdditionalConfiguration(TezRuntimeConfiguration
+ .TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, "5120")
+ .setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true")
.setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "false")
@@ -121,6 +125,10 @@ public class TestUnorderedPartitionedKVOutputConfig {
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
assertEquals(2222,
conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
+ assertEquals(true,
+ conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, false));
+ assertEquals(5120,
+ conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, 512));
assertEquals("io", conf.get("io.shouldExist"));
assertEquals("file", conf.get("file.shouldExist"));
assertEquals("fs", conf.get("fs.shouldExist"));
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 2a94d9b..b661519 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -55,6 +55,7 @@ import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.mapreduce.examples.CartesianProduct;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -215,6 +216,60 @@ public class TestTezJobs {
assertEquals(0, expectedResult.size());
}
+ /**
+ * test whole {@link HashJoinExample} pipeline as following: <br>
+ * {@link JoinDataGen} -> {@link HashJoinExample} -> {@link JoinValidate}
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testHashJoinExampleWithDataViaEvent() throws Exception {
+
+ Path testDir = new Path("/tmp/testHashJoinExampleDataViaEvent");
+ Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+ remoteFs.mkdirs(stagingDirPath);
+ remoteFs.mkdirs(testDir);
+
+ Path dataPath1 = new Path(testDir, "inPath1");
+ Path dataPath2 = new Path(testDir, "inPath2");
+ Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+ Path outPath = new Path(testDir, "outPath");
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+
+ //turn on the dataViaEvent
+ tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED, true);
+
+ TezClient tezSession = null;
+ try {
+ tezSession = TezClient.create("HashJoinExampleSession", tezConf, true);
+ tezSession.start();
+
+ JoinDataGen dataGen = new JoinDataGen();
+ String[] dataGenArgs = new String[] {
+ "-counter",
+ dataPath1.toString(), "1048576", dataPath2.toString(), "8",
+ expectedOutputPath.toString(), "2" };
+ assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
+
+ HashJoinExample joinExample = new HashJoinExample();
+ String[] args = new String[] {
+ dataPath1.toString(), dataPath2.toString(), "1", outPath.toString(),
+ "doBroadcast"};
+
+ assertEquals(0, joinExample.run(tezConf, args, tezSession));
+
+ JoinValidate joinValidate = new JoinValidate();
+ String[] validateArgs = new String[] {
+ "-counter", expectedOutputPath.toString(), outPath.toString(), "3" };
+ assertEquals(0, joinValidate.run(tezConf, validateArgs, tezSession));
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+ }
+
@Test(timeout = 60000)
public void testHashJoinExampleDisableSplitGrouping() throws Exception {
HashJoinExample hashJoinExample = new HashJoinExample();