You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/16 22:54:43 UTC
tez git commit: TEZ-3216. Add support for more precise partition
stats in VertexManagerEvent. Contributed by Ming Ma.
Repository: tez
Updated Branches:
refs/heads/master cc33410d8 -> 80ba12b2a
TEZ-3216. Add support for more precise partition stats in VertexManagerEvent. Contributed by Ming Ma.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/80ba12b2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/80ba12b2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/80ba12b2
Branch: refs/heads/master
Commit: 80ba12b2ad03ccb860aafc53a46c447aaa242d0d
Parents: cc33410
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 16 15:54:01 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 16 15:54:01 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/api/TezRuntimeConfiguration.java | 79 +++++++++++++++++-
.../library/common/shuffle/ShuffleUtils.java | 85 +++++++++++++++-----
.../common/sort/impl/ExternalSorter.java | 19 +++--
.../common/sort/impl/PipelinedSorter.java | 10 ++-
.../common/sort/impl/dflt/DefaultSorter.java | 2 +-
.../writers/UnorderedPartitionedKVWriter.java | 72 ++++++++---------
.../output/OrderedPartitionedKVOutput.java | 2 +-
.../library/output/UnorderedKVOutput.java | 1 +
.../output/UnorderedPartitionedKVOutput.java | 1 +
.../src/main/proto/ShufflePayloads.proto | 7 ++
.../common/shuffle/TestShuffleUtils.java | 15 ++--
.../common/sort/impl/TestPipelinedSorter.java | 10 ++-
.../TestUnorderedPartitionedKVWriter.java | 81 +++++++++++++------
.../TestOrderedPartitionedKVEdgeConfig.java | 10 ++-
.../library/output/TestOnFileSortedOutput.java | 54 +++++++++----
16 files changed, 321 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1e1803d..5f90539 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3216. Add support for more precise partition stats in VertexManagerEvent.
TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements
TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 08f76f2..4d24bfb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -175,11 +175,16 @@ public class TezRuntimeConfiguration {
/**
* Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496
* This can be enabled/disabled at vertex level.
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats}
+ * defines the list of values that can be specified.
+ * TODO TEZ-3303 Given ShuffleVertexManager doesn't consume precise stats
+ * yet. So do not set the value to "precise" yet when ShuffleVertexManager is used.
*/
- @ConfigurationProperty(type = "boolean")
- public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS = TEZ_RUNTIME_PREFIX +
- "report.partition.stats";
- public static final boolean TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT = true;
+ @ConfigurationProperty
+ public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS =
+ TEZ_RUNTIME_PREFIX + "report.partition.stats";
+ public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT =
+ ReportPartitionStats.MEMORY_OPTIMIZED.getType();
/**
* Size of the buffer to use if not writing directly to disk.
@@ -635,4 +640,70 @@ public class TezRuntimeConfiguration {
public static Map<String, String> getOtherConfigDefaults() {
return Collections.unmodifiableMap(otherConfMap);
}
+
+ public enum ReportPartitionStats {
+ @Deprecated
+ /**
+ * Don't report partition stats. It is the same as NONE.
+ * It is defined to maintain backward compatibility given
+ * Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
+ * to be boolean type.
+ */
+ DISABLED("false"),
+
+ @Deprecated
+ /**
+ * Report partition stats. It is the same as MEMORY_OPTIMIZED.
+ * It is defined to maintain backward compatibility given
+ * Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
+ * to be boolean type.
+ */
+ ENABLED("true"),
+
+ /**
+ * Don't report partition stats.
+ */
+ NONE("none"),
+
+ /**
+ * Report partition stats with less precision to reduce
+ * memory and CPU overhead
+ */
+ MEMORY_OPTIMIZED("memory_optimized"),
+
+ /**
+ * Report precise partition stats in MB.
+ */
+ PRECISE("precise");
+
+ private final String type;
+
+ private ReportPartitionStats(String type) {
+ this.type = type;
+ }
+
+ public final String getType() {
+ return type;
+ }
+
+ public boolean isEnabled() {
+ return !equals(ReportPartitionStats.DISABLED) &&
+ !equals(ReportPartitionStats.NONE);
+ }
+
+ public boolean isPrecise() {
+ return equals(ReportPartitionStats.PRECISE);
+ }
+
+ public static ReportPartitionStats fromString(String type) {
+ if (type != null) {
+ for (ReportPartitionStats b : ReportPartitionStats.values()) {
+ if (type.equalsIgnoreCase(b.type)) {
+ return b;
+ }
+ }
+ }
+ throw new IllegalArgumentException("Invalid type " + type);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index ae646ea..d74e447 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import javax.crypto.SecretKey;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
@@ -67,11 +68,13 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedPartitionStatsProto;
public class ShuffleUtils {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+ private static final long MB = 1024l * 1024l;
//Shared by multiple threads
private static volatile SSLFactory sslFactory;
@@ -400,7 +403,8 @@ public class ShuffleUtils {
public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
- @Nullable long[] partitionStats) throws IOException {
+ @Nullable long[] partitionStats, boolean reportDetailedPartitionStats)
+ throws IOException {
Preconditions.checkArgument(eventList != null, "EventList can't be null");
context.notifyProgress();
@@ -420,34 +424,50 @@ public class ShuffleUtils {
finalMergeEnabled, isLastEvent, pathComponent);
if (finalMergeEnabled || isLastEvent) {
- ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
- ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
-
- long outputSize = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+ VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,
+ reportDetailedPartitionStats);
+ eventList.add(vmEvent);
+ }
- //Set this information only when required. In pipelined shuffle, multiple events would end
- // up adding up to final outputsize. This is needed for auto-reduce parallelism to work
- // properly.
- vmBuilder.setOutputSize(outputSize);
+ CompositeDataMovementEvent csdme =
+ CompositeDataMovementEvent.create(0, numPhysicalOutputs, payload);
+ eventList.add(csdme);
+ }
- //set partition stats
- if (partitionStats != null && partitionStats.length > 0) {
- RoaringBitmap stats = getPartitionStatsForPhysicalOutput(partitionStats);
+ public static VertexManagerEvent generateVMEvent(OutputContext context,
+ long[] sizePerPartition, boolean reportDetailedPartitionStats)
+ throws IOException {
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
+
+ long outputSize = context.getCounters().
+ findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+
+ // Set this information only when required. In pipelined shuffle,
+ // multiple events would end up adding up to final output size.
+ // This is needed for auto-reduce parallelism to work properly.
+ vmBuilder.setOutputSize(outputSize);
+
+ //set partition stats
+ if (sizePerPartition != null && sizePerPartition.length > 0) {
+ if (reportDetailedPartitionStats) {
+ vmBuilder.setDetailedPartitionStats(
+ getDetailedPartitionStatsForPhysicalOutput(sizePerPartition));
+ } else {
+ RoaringBitmap stats = getPartitionStatsForPhysicalOutput(
+ sizePerPartition);
DataOutputBuffer dout = new DataOutputBuffer();
stats.serialize(dout);
- ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+ ByteString partitionStatsBytes =
+ TezCommonUtils.compressByteArrayToByteString(dout.getData());
vmBuilder.setPartitionStats(partitionStatsBytes);
}
-
- VertexManagerEvent vmEvent = VertexManagerEvent.create(
- context.getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer());
- eventList.add(vmEvent);
}
-
- CompositeDataMovementEvent csdme =
- CompositeDataMovementEvent.create(0, numPhysicalOutputs, payload);
- eventList.add(csdme);
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(
+ context.getDestinationVertexName(),
+ vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+ return vmEvent;
}
/**
@@ -469,6 +489,29 @@ public class ShuffleUtils {
return partitionStats;
}
+ static long ceil(long a, long b) {
+ return (a + (b - 1)) / b;
+ }
+
+ /**
+ * Detailed partition stats
+ *
+ * @param sizes actual partition sizes
+ */
+ public static DetailedPartitionStatsProto
+ getDetailedPartitionStatsForPhysicalOutput(long[] sizes) {
+ DetailedPartitionStatsProto.Builder builder =
+ DetailedPartitionStatsProto.newBuilder();
+ for (int i=0; i<sizes.length; i++) {
+ // Round the size up. So 1 byte -> the value of sizeInMB == 1
+ // Throws IllegalArgumentException if value is greater than
+ // Integer.MAX_VALUE. That should be ok given Integer.MAX_VALUE * MB
+ // means PB.
+ int sizeInMb = Ints.checkedCast(ceil(sizes[i], MB));
+ builder.addSizeInMb(sizeInMb);
+ }
+ return builder.build();
+ }
/**
* Log individual fetch complete event.
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 7a2dc68..b6fe457 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -53,6 +53,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
@@ -159,16 +160,19 @@ public abstract class ExternalSorter {
protected final TezCounter numAdditionalSpills;
// Number of files offered via shuffle-handler to consumers.
protected final TezCounter numShuffleChunks;
+ // How partition stats should be reported.
+ final ReportPartitionStats reportPartitionStats;
public ExternalSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
this.outputContext = outputContext;
this.conf = conf;
this.partitions = numOutputs;
- boolean reportPartitionStats = conf.getBoolean(TezRuntimeConfiguration
- .TEZ_RUNTIME_REPORT_PARTITION_STATS,
- TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
- this.partitionStats = (reportPartitionStats) ? (new long[partitions]) : null;
+ reportPartitionStats = ReportPartitionStats.fromString(
+ conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+ partitionStats = reportPartitionStats.isEnabled() ?
+ (new long[partitions]) : null;
cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
@@ -202,7 +206,8 @@ public abstract class ExternalSorter {
+ ", valueSerializerClass=" + valSerializer
+ ", comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf)
+ ", partitioner=" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS)
- + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)
+ + ", reportPartitionStats=" + reportPartitionStats);
// counters
mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
@@ -412,4 +417,8 @@ public abstract class ExternalSorter {
.findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
statsReporter.reportItemsProcessed(outputRecords);
}
+
+ public boolean reportDetailedPartitionStats() {
+ return reportPartitionStats.isPrecise();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 5695bde..897d7d7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -347,9 +347,10 @@ public class PipelinedSorter extends ExternalSorter {
private void sendPipelinedShuffleEvents() throws IOException{
List<Event> events = Lists.newLinkedList();
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
- ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
- (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
- pathComponent, partitionStats);
+ ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
+ outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
+ partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
+ reportDetailedPartitionStats());
outputContext.sendEvents(events);
LOG.info(outputContext.getDestinationVertexName() +
": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -671,7 +672,8 @@ public class PipelinedSorter extends ExternalSorter {
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, i, indexCacheList.get(i), partitions,
- sendEmptyPartitionDetails, pathComponent, partitionStats);
+ sendEmptyPartitionDetails, pathComponent, partitionStats,
+ reportDetailedPartitionStats());
LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
}
outputContext.sendEvents(events);
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index a6a60c2..69bfdb8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -1133,7 +1133,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
- partitionStats);
+ partitionStats, reportDetailedPartitionStats());
LOG.info(outputContext.getDestinationVertexName() + ": " +
"Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 76075bb..152096c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -56,8 +56,8 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -65,9 +65,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -150,6 +148,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
private final Condition spillInProgress = spillLock.newCondition();
private final boolean pipelinedShuffle;
+ // How partition stats should be reported.
+ final ReportPartitionStats reportPartitionStats;
private final long indexFileSizeEstimate;
@@ -208,7 +208,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
.build());
spillExecutor = MoreExecutors.listeningDecorator(executor);
numRecordsPerPartition = new int[numPartitions];
- sizePerPartition = new long[numPartitions];
+ reportPartitionStats = ReportPartitionStats.fromString(
+ conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+ sizePerPartition = (reportPartitionStats.isEnabled()) ?
+ new long[numPartitions] : null;
outputLargeRecordsCounter = outputContext.getCounters().findCounter(
TaskCounter.OUTPUT_LARGE_RECORDS);
@@ -233,7 +237,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
+ ", sizePerBuffer=" + sizePerBuffer
+ ", skipBuffers=" + skipBuffers
+ ", pipelinedShuffle=" + pipelinedShuffle
- + ", numPartitions=" + numPartitions);
+ + ", numPartitions=" + numPartitions
+ + ", reportPartitionStats=" + reportPartitionStats);
}
private void computeNumBuffersAndSize(int bufferLimit) {
@@ -364,10 +369,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
}
+ private boolean reportPartitionStats() {
+ return (sizePerPartition != null);
+ }
+
private void updateGlobalStats(WrappedBuffer buffer) {
for (int i = 0; i < numPartitions; i++) {
numRecordsPerPartition[i] += buffer.recordsPerPartition[i];
- sizePerPartition[i] += buffer.sizePerPartition[i];
+ if (reportPartitionStats()) {
+ sizePerPartition[i] += buffer.sizePerPartition[i];
+ }
}
}
@@ -529,7 +540,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
if (outputRecordsCounter.getValue() == 0) {
emptyPartitions.set(0);
}
- sizePerPartition[0] = rawLen;
+ if (reportPartitionStats()) {
+ sizePerPartition[0] = rawLen;
+ }
cleanupCurrentBuffer();
outputBytesWithOverheadCounter.increment(rawLen);
@@ -575,37 +588,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
return emptyPartitions;
}
- private Event generateVMEvent() throws IOException {
- return generateVMEvent(this.sizePerPartition);
+ public boolean reportDetailedPartitionStats() {
+ return reportPartitionStats.isPrecise();
}
- private Event generateVMEvent(long[] sizePerPartition) throws IOException {
- ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
- ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
-
- long outputSize = outputContext.getCounters().
- findCounter(TaskCounter.OUTPUT_BYTES).getValue();
-
- // Set this information only when required. In pipelined shuffle,
- // multiple events would end up adding up to final output size.
- // This is needed for auto-reduce parallelism to work properly.
- vmBuilder.setOutputSize(outputSize);
-
- //set partition stats
- if (sizePerPartition != null && sizePerPartition.length > 0) {
- RoaringBitmap stats = ShuffleUtils.getPartitionStatsForPhysicalOutput(
- sizePerPartition);
- DataOutputBuffer dout = new DataOutputBuffer();
- stats.serialize(dout);
- ByteString partitionStatsBytes =
- TezCommonUtils.compressByteArrayToByteString(dout.getData());
- vmBuilder.setPartitionStats(partitionStatsBytes);
- }
-
- VertexManagerEvent vmEvent = VertexManagerEvent.create(
- outputContext.getDestinationVertexName(),
- vmBuilder.build().toByteString().asReadOnlyByteBuffer());
- return vmEvent;
+ private Event generateVMEvent() throws IOException {
+ return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition,
+ this.reportDetailedPartitionStats());
}
private Event generateDMEvent() throws IOException {
@@ -667,7 +656,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
if (currentBuffer.nextPosition == 0) {
if (pipelinedShuffle) {
List<Event> eventList = Lists.newLinkedList();
- eventList.add(generateVMEvent(new long[numPartitions]));
+ eventList.add(ShuffleUtils.generateVMEvent(outputContext,
+ reportPartitionStats() ? new long[numPartitions] : null,
+ reportDetailedPartitionStats()));
//Send final event with all empty partitions and null path component.
BitSet emptyPartitions = new BitSet(numPartitions);
emptyPartitions.flip(0, numPartitions);
@@ -844,7 +835,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
writer.append(key, value);
outputLargeRecordsCounter.increment(1);
numRecordsPerPartition[i]++;
- sizePerPartition[i] += writer.getRawLength();
+ if (reportPartitionStats()) {
+ sizePerPartition[i] += writer.getRawLength();
+ }
writer.close();
additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(),
@@ -985,7 +978,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
try {
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
if (isFinalUpdate) {
- eventList.add(generateVMEvent(sizePerPartition));
+ eventList.add(ShuffleUtils.generateVMEvent(outputContext,
+ sizePerPartition, reportDetailedPartitionStats()));
}
Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
pathComponent, emptyPartitions);
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index c0b0760..9a3d778 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
- sorter.getPartitionStats());
+ sorter.getPartitionStats(), sorter.reportDetailedPartitionStats());
}
return eventList;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 879c2e0..4f74f7d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -172,6 +172,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
}
// TODO Maybe add helper methods to extract keys
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 90c0ed4..c4b3b22 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -145,6 +145,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
}
// TODO Maybe add helper methods to extract keys
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 9b0fc16..f78cbac 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -41,9 +41,16 @@ message InputInformationEventPayloadProto {
optional int32 partition_range = 1;
}
+// DetailedPartitionStatsProto represents size of a list of partitions.
+// It is more accurate than the partition_stats.
+message DetailedPartitionStatsProto {
+ repeated int32 size_in_mb = 1;
+}
+
message VertexManagerEventPayloadProto {
optional int64 output_size = 1;
optional bytes partition_stats = 2;
+ optional DetailedPartitionStatsProto detailed_partition_stats = 3;
}
message ShuffleEdgeManagerConfigPayloadProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index c542030..4233f5d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -161,8 +161,9 @@ public class TestShuffleUtils {
int spillId = 0;
int physicalOutputs = 10;
String pathComponent = "/attempt_x_y_0/file.out";
- ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
- spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
+ ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+ outputContext, spillId, new TezSpillRecord(indexFile, conf),
+ physicalOutputs, true, pathComponent, null, false);
Assert.assertTrue(events.size() == 1);
Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -199,8 +200,9 @@ public class TestShuffleUtils {
String pathComponent = "/attempt_x_y_0/file.out";
//normal code path where we do final merge all the time
- ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
- spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
+ ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+ outputContext, spillId, new TezSpillRecord(indexFile, conf),
+ physicalOutputs, true, pathComponent, null, false);
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -239,8 +241,9 @@ public class TestShuffleUtils {
String pathComponent = "/attempt_x_y_0/file.out";
//normal code path where we do final merge all the time
- ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext,
- spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
+ ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
+ outputContext, spillId, new TezSpillRecord(indexFile, conf),
+ physicalOutputs, true, pathComponent, null, false);
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 70819e5..80e7b14 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -40,6 +40,7 @@ import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -404,10 +405,11 @@ public class TestPipelinedSorter {
writeData(sorter, numKeys, keySize);
//partition stats;
- boolean partitionStats = conf.getBoolean(TezRuntimeConfiguration
- .TEZ_RUNTIME_REPORT_PARTITION_STATS, TezRuntimeConfiguration
- .TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
- if (partitionStats) {
+ ReportPartitionStats partitionStats =
+ ReportPartitionStats.fromString(conf.get(
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+ if (partitionStats.isEnabled()) {
assertTrue(sorter.getPartitionStats() != null);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 9d2b615..41b2b97 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
@@ -115,14 +116,28 @@ public class TestUnorderedPartitionedKVWriter {
private static FileSystem localFs;
private boolean shouldCompress;
+ private ReportPartitionStats reportPartitionStats;
- public TestUnorderedPartitionedKVWriter(boolean shouldCompress) {
+ public TestUnorderedPartitionedKVWriter(boolean shouldCompress,
+ ReportPartitionStats reportPartitionStats) {
this.shouldCompress = shouldCompress;
+ this.reportPartitionStats = reportPartitionStats;
}
- @Parameters
+ @SuppressWarnings("deprecation")
+ @Parameterized.Parameters(name = "test[{0}, {1}, {2}]")
public static Collection<Object[]> data() {
- Object[][] data = new Object[][] { { false }, { true } };
+ Object[][] data = new Object[][] {
+ { false, ReportPartitionStats.DISABLED },
+ { false, ReportPartitionStats.ENABLED },
+ { false, ReportPartitionStats.NONE },
+ { false, ReportPartitionStats.MEMORY_OPTIMIZED },
+ { false, ReportPartitionStats.PRECISE },
+ { true, ReportPartitionStats.DISABLED },
+ { true, ReportPartitionStats.ENABLED },
+ { true, ReportPartitionStats.NONE },
+ { true, ReportPartitionStats.MEMORY_OPTIMIZED },
+ { true, ReportPartitionStats.PRECISE }};
return Arrays.asList(data);
}
@@ -415,36 +430,54 @@ public class TestUnorderedPartitionedKVWriter {
assertEquals(0, expectedValues.size());
}
- private long[] getPartitionStats(
- VertexManagerEvent vme) throws IOException {
+ private int[] getPartitionStats(VertexManagerEvent vme) throws IOException {
RoaringBitmap partitionStats = new RoaringBitmap();
ShuffleUserPayloads.VertexManagerEventPayloadProto
payload = ShuffleUserPayloads.VertexManagerEventPayloadProto
.parseFrom(ByteString.copyFrom(vme.getUserPayload()));
- assertTrue(payload.hasPartitionStats());
- ByteString compressedPartitionStats = payload.getPartitionStats();
- byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
- compressedPartitionStats);
- ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
- partitionStats.deserialize(new DataInputStream(bin));
- long[] stats = new long[partitionStats.getCardinality()];
- Iterator<Integer> it = partitionStats.iterator();
- final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
- final int RANGE_LEN = RANGES.length;
- while (it.hasNext()) {
- int pos = it.next();
- int index = ((pos) / RANGE_LEN);
- int rangeIndex = ((pos) % RANGE_LEN);
- if (RANGES[rangeIndex].getSizeInMB() > 0) {
- stats[index] += RANGES[rangeIndex].getSizeInMB();
+ if (!reportPartitionStats.isEnabled()) {
+ assertFalse(payload.hasPartitionStats());
+ assertFalse(payload.hasDetailedPartitionStats());
+ return null;
+ }
+ if (reportPartitionStats.isPrecise()) {
+ assertTrue(payload.hasDetailedPartitionStats());
+ List<Integer> sizeInMBList =
+ payload.getDetailedPartitionStats().getSizeInMbList();
+ int[] stats = new int[sizeInMBList.size()];
+ for (int i=0; i<sizeInMBList.size(); i++) {
+ stats[i] += sizeInMBList.get(i);
}
+ return stats;
+ } else {
+ assertTrue(payload.hasPartitionStats());
+ ByteString compressedPartitionStats = payload.getPartitionStats();
+ byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
+ compressedPartitionStats);
+ ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
+ partitionStats.deserialize(new DataInputStream(bin));
+ int[] stats = new int[partitionStats.getCardinality()];
+ Iterator<Integer> it = partitionStats.iterator();
+ final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
+ final int RANGE_LEN = RANGES.length;
+ while (it.hasNext()) {
+ int pos = it.next();
+ int index = ((pos) / RANGE_LEN);
+ int rangeIndex = ((pos) % RANGE_LEN);
+ if (RANGES[rangeIndex].getSizeInMB() > 0) {
+ stats[index] += RANGES[rangeIndex].getSizeInMB();
+ }
+ }
+ return stats;
}
- return stats;
}
private void verifyPartitionStats(VertexManagerEvent vme,
BitSet expectedPartitionsWithData) throws IOException {
- long[] stats = getPartitionStats(vme);
+ int[] stats = getPartitionStats(vme);
+ if (stats == null) {
+ return;
+ }
for (int i = 0; i < stats.length; i++) {
// The stats should be greater than zero if and only if
// the partition has data
@@ -922,6 +955,8 @@ public class TestUnorderedPartitionedKVWriter {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC,
DefaultCodec.class.getName());
}
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ reportPartitionStats.getType());
return conf;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index fabf52d..9d6ca50 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.junit.Test;
public class TestOrderedPartitionedKVEdgeConfig {
@@ -132,7 +133,8 @@ public class TestOrderedPartitionedKVEdgeConfig {
additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter");
- additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, "true");
+ additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ ReportPartitionStats.MEMORY_OPTIMIZED.getType());
additionalConfs.put("file.shouldExist", "file");
OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
@@ -173,9 +175,11 @@ public class TestOrderedPartitionedKVEdgeConfig {
assertEquals("io", outputConf.get("io.shouldExist"));
assertEquals("file", outputConf.get("file.shouldExist"));
assertEquals("fs", outputConf.get("fs.shouldExist"));
- assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration
- .TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ ReportPartitionStats partitionStats =
+ ReportPartitionStats.fromString(outputConf.get(
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+ assertEquals(true, partitionStats.isEnabled());
assertEquals(3, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 8942f4b..93c4f92 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -38,6 +38,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
@@ -98,6 +99,7 @@ public class TestOnFileSortedOutput {
private boolean sendEmptyPartitionViaEvent;
//Partition index for which data should not be written to.
private int emptyPartitionIdx;
+ private ReportPartitionStats reportPartitionStats;
/**
* Constructor
@@ -107,13 +109,14 @@ public class TestOnFileSortedOutput {
* @param sorterThreads number of threads needed for sorter (required only for pipelined sorter)
* @param emptyPartitionIdx for which data should not be generated
*/
- public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, SorterImpl sorterImpl,
- int sorterThreads, int emptyPartitionIdx) throws IOException {
+ public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent,
+ SorterImpl sorterImpl, int sorterThreads, int emptyPartitionIdx,
+ ReportPartitionStats reportPartitionStats) throws IOException {
this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent;
this.emptyPartitionIdx = emptyPartitionIdx;
this.sorterImpl = sorterImpl;
this.sorterThreads = sorterThreads;
-
+ this.reportPartitionStats = reportPartitionStats;
conf = new Configuration();
workingDir = new Path(".", this.getClass().getName());
@@ -135,7 +138,8 @@ public class TestOnFileSortedOutput {
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
sendEmptyPartitionViaEvent);
-
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ reportPartitionStats.getType());
outputSize.set(0);
numRecords.set(0);
fs.mkdirs(workingDir);
@@ -147,27 +151,39 @@ public class TestOnFileSortedOutput {
fs.delete(workingDir, true);
}
- @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}]")
+ @SuppressWarnings("deprecation")
+ @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}, {4}]")
public static Collection<Object[]> getParameters() {
Collection<Object[]> parameters = new ArrayList<Object[]>();
- //empty_partition_via_events_enabled, noOfSortThreads, partitionToBeEmpty
- parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1 });
- parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0 });
- parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1 });
- parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0 });
+ //empty_partition_via_events_enabled, noOfSortThreads,
+ // partitionToBeEmpty, reportPartitionStats
+ parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0,
+ ReportPartitionStats.PRECISE });
//Pipelined sorter
- parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1 });
- parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0 });
- parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1 });
- parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0 });
-
+ parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0,
+ ReportPartitionStats.ENABLED });
+ parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0,
+ ReportPartitionStats.PRECISE });
return parameters;
}
private void startSortedOutput(int partitions) throws Exception {
OutputContext context = createTezOutputContext();
- conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
@@ -299,7 +315,11 @@ public class TestOnFileSortedOutput {
.parseFrom(
ByteString.copyFrom(((VertexManagerEvent) eventList.get(0)).getUserPayload()));
- assertTrue(vmPayload.hasPartitionStats());
+ if (reportPartitionStats.isPrecise()) {
+ assertTrue(vmPayload.hasDetailedPartitionStats());
+ } else {
+ assertTrue(vmPayload.hasPartitionStats());
+ }
assertEquals(HOST, payload.getHost());
assertEquals(PORT, payload.getPort());
assertEquals(UniqueID, payload.getPathComponent());