You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/19 10:22:09 UTC
git commit: TEZ-938. Avoid fetching empty partitions when the
OnFileSortedOutput,
ShuffledMergedInput pair is used. Contributed by Rajesh Balamohan.
Repository: incubator-tez
Updated Branches:
refs/heads/master 44ea005b5 -> 8d4b8e9a5
TEZ-938. Avoid fetching empty partitions when the OnFileSortedOutput,
ShuffledMergedInput pair is used. Contributed by Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d4b8e9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d4b8e9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d4b8e9a
Branch: refs/heads/master
Commit: 8d4b8e9a5b6cac168cd793b93e0940494c11ebc4
Parents: 44ea005
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Mar 19 02:20:25 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Mar 19 02:20:25 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 4 ++
.../shuffle/impl/ShuffleInputEventHandler.java | 21 +++++++-
.../common/shuffle/impl/ShuffleScheduler.java | 18 ++++---
.../common/sort/impl/TezIndexRecord.java | 5 ++
.../library/output/OnFileSortedOutput.java | 54 ++++++++++++++++++--
.../impl/ShuffleInputEventHandlerImpl.java | 14 +++++
.../src/main/proto/ShufflePayloads.proto | 1 +
7 files changed, 104 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 61a1b64..a45753b 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -332,6 +332,10 @@ public class TezJobConfig {
public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS =
"tez.runtime.intermediate-input.key.secondary.comparator.class";
+ public static final String TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED =
+ "tez.runtime.empty.partitions.info-via-events.enabled";
+ public static final boolean TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT = true;
+
public static final String TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED = "tez.runtime.broadcast.data-via-events.enabled";
public static final boolean TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED_DEFAULT = false;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 2676a19..54bb55e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -18,9 +18,12 @@
package org.apache.tez.runtime.library.common.shuffle.impl;
+import java.io.IOException;
import java.net.URI;
import java.util.List;
+import java.util.zip.InflaterInputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -75,7 +78,6 @@ public class ShuffleInputEventHandler {
InputAttemptIdentifier srcAttemptIdentifier =
new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier);
- scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
// TODO NEWTEZ See if this duration hack can be removed.
int duration = shufflePayload.getRunDuration();
@@ -83,6 +85,23 @@ public class ShuffleInputEventHandler {
maxMapRuntime = duration;
scheduler.informMaxMapRunTime(maxMapRuntime);
}
+ if (shufflePayload.getData().hasEmptyPartitions()) {
+ try {
+ InflaterInputStream in = new
+ InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
+ byte[] emptyPartition = IOUtils.toByteArray(in);
+ if (emptyPartition[partitionId] == 1) {
+ LOG.info("Got empty payload : PartitionId : " +
+ partitionId + " : " + emptyPartition[partitionId]);
+ scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
+ return;
+ }
+ } catch (IOException e) {
+ throw new TezUncheckedException("Unable to set " +
+ "the empty partition to succeeded", e);
+ }
+ }
+ scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
}
private void processTaskFailedEvent(InputFailedEvent ifEvent) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index ad197f1..f1f12ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -157,10 +157,19 @@ class ShuffleScheduler {
) throws IOException {
String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber());
failureCounts.remove(taskIdentifier);
- hostFailures.remove(host.getHostName());
+ if (host != null) {
+ hostFailures.remove(host.getHostName());
+ }
if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
- output.commit();
+ if (output != null) {
+ output.commit();
+ if (output.getType() == Type.DISK) {
+ bytesShuffledToDisk.increment(bytesCompressed);
+ } else {
+ bytesShuffledToMem.increment(bytesCompressed);
+ }
+ }
setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
shuffledMapsCounter.increment(1);
if (--remainingMaps == 0) {
@@ -173,11 +182,6 @@ class ShuffleScheduler {
logProgress();
reduceShuffleBytes.increment(bytesCompressed);
reduceBytesDecompressed.increment(bytesDecompressed);
- if (output.getType() == Type.DISK) {
- bytesShuffledToDisk.increment(bytesCompressed);
- } else {
- bytesShuffledToMem.increment(bytesCompressed);
- }
if (LOG.isDebugEnabled()) {
LOG.debug("src task: "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
index 95ae8eb..f6fd802 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
@@ -42,4 +42,9 @@ public class TezIndexRecord {
public long getPartLength() {
return partLength;
}
+
+ public boolean hasData() {
+ //TEZ-941 - Avoid writing out empty partitions
+ return !(rawLength == 2);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 7ed5c21..8c64ed0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -22,8 +22,13 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
@@ -36,12 +41,16 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
/**
* <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs
@@ -49,16 +58,17 @@ import com.google.common.collect.Lists;
*/
public class OnFileSortedOutput implements LogicalOutput {
+ private static final Log LOG = LogFactory.getLog(OnFileSortedOutput.class);
+
protected ExternalSorter sorter;
protected Configuration conf;
protected int numOutputs;
protected TezOutputContext outputContext;
private long startTime;
private long endTime;
-
+ private boolean sendEmptyPartitionDetails;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
-
+
@Override
public List<Event> initialize(TezOutputContext outputContext)
throws IOException {
@@ -76,7 +86,10 @@ public class OnFileSortedOutput implements LogicalOutput {
} else {
sorter = new DefaultSorter();
}
-
+
+ sendEmptyPartitionDetails = this.conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+ TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
sorter.initialize(outputContext, conf, numOutputs);
return Collections.emptyList();
}
@@ -126,6 +139,37 @@ public class OnFileSortedOutput implements LogicalOutput {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
+
+ if (sendEmptyPartitionDetails) {
+ Path indexFile = sorter.getMapOutput().getOutputIndexFile();
+ TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+ //TODO: replace with BitSet in JDK 1.7 (no support for valueOf, toByteArray in 1.6)
+ byte[] partitionDetails = new byte[numOutputs];
+ int emptyPartitions = 0;
+ for(int i=0;i<spillRecord.size();i++) {
+ TezIndexRecord indexRecord = spillRecord.getIndex(i);
+ if (!indexRecord.hasData()) {
+ partitionDetails[i] = 1;
+ emptyPartitions++;
+ }
+ }
+ if (emptyPartitions > 0) {
+ //compress the data
+ ByteString.Output os = ByteString.newOutput();
+ DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
+ new Deflater(Deflater.BEST_COMPRESSION));
+ compressOs.write(partitionDetails);
+ compressOs.finish();
+
+ ShuffleUserPayloads.DataProto.Builder dataProtoBuilder = ShuffleUserPayloads.DataProto.newBuilder();
+ dataProtoBuilder.setEmptyPartitions(os.toByteString());
+ payloadBuilder.setData(dataProtoBuilder.build());
+
+ LOG.info("EmptyPartition bitsetSize=" + partitionDetails.length + ", numOutputs="
+ + numOutputs + ", emptyPartitions=" + emptyPartitions
+ + ", compressedSize=" + os.toByteString().size());
+ }
+ }
payloadBuilder.setHost(host);
payloadBuilder.setPort(shufflePort);
payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
@@ -149,4 +193,4 @@ public class OnFileSortedOutput implements LogicalOutput {
return events;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 63957e9..5f2b368 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -21,7 +21,9 @@ package org.apache.tez.runtime.library.shuffle.common.impl;
import java.io.IOException;
import java.util.List;
+import java.util.zip.InflaterInputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -101,6 +103,18 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
shufflePayload.getPathComponent());
if (shufflePayload.hasData()) {
DataProto dataProto = shufflePayload.getData();
+ if (dataProto.hasEmptyPartitions()) {
+ int partitionId = dme.getSourceIndex();
+ InflaterInputStream in = new
+ InflaterInputStream(shufflePayload.getData().getEmptyPartitions().newInput());
+ byte[] emptyPartition = IOUtils.toByteArray(in);
+ if (emptyPartition[partitionId] == 1) {
+ LOG.info("Got empty payload : PartitionId : " +
+ partitionId + " : " + emptyPartition[partitionId]);
+ shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
+ return;
+ }
+ }
FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
moveDataToFetchedInput(dataProto, fetchedInput);
shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d4b8e9a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index b4ae332..ad676bf 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -33,6 +33,7 @@ message DataProto {
optional int32 raw_length = 1;
optional int32 compressed_length = 2;
optional bytes data = 3;
+ optional bytes empty_partitions = 4;
}
message InputInformationEventPayloadProto {