You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/09/21 15:55:08 UTC
tez git commit: TEZ-3163. Reuse and tune Inflaters and Deflaters to
speed DME processing (jeagles)
Repository: tez
Updated Branches:
refs/heads/master 9cf25d142 -> da4098b9d
TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/da4098b9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/da4098b9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/da4098b9
Branch: refs/heads/master
Commit: da4098b9d6f72e6d4aacc1623622a0875408d2ba
Parents: 9cf25d1
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Sep 21 10:54:47 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Sep 21 10:54:47 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/TezCommonUtils.java | 38 +++++++++++++++++---
.../apache/tez/dag/api/DagTypeConverters.java | 5 +--
.../tez/dag/api/TestDagTypeConverters.java | 2 +-
.../apache/tez/dag/history/utils/DAGUtils.java | 16 +++++----
.../vertexmanager/ShuffleVertexManagerBase.java | 6 +++-
.../library/common/shuffle/ShuffleUtils.java | 21 ++++++-----
.../impl/ShuffleInputEventHandlerImpl.java | 5 ++-
.../ShuffleInputEventHandlerOrderedGrouped.java | 6 ++--
.../common/sort/impl/PipelinedSorter.java | 8 +++--
.../common/sort/impl/dflt/DefaultSorter.java | 6 +++-
.../writers/UnorderedPartitionedKVWriter.java | 11 +++---
.../output/OrderedPartitionedKVOutput.java | 8 +++--
.../library/output/UnorderedKVOutput.java | 3 +-
.../output/UnorderedPartitionedKVOutput.java | 3 +-
.../common/shuffle/TestShuffleUtils.java | 6 ++--
16 files changed, 103 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c7f540b..3a55ec7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing
TEZ-3434. Add unit tests for flushing of recovery events.
TEZ-3317. Speculative execution starts too early due to 0 progress.
TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index e4cf028..afdce39 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.StringTokenizer;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import org.apache.commons.io.IOUtils;
@@ -345,13 +346,35 @@ public class TezCommonUtils {
}
}
+ private static final boolean NO_WRAP = true;
+
+ @Private
+ public static Deflater newBestCompressionDeflater() {
+ return new Deflater(Deflater.BEST_COMPRESSION, NO_WRAP);
+ }
+
+ @Private
+ public static Deflater newBestSpeedDeflater() {
+ return new Deflater(Deflater.BEST_SPEED, NO_WRAP);
+ }
+
+ @Private
+ public static Inflater newInflater() {
+ return new Inflater(NO_WRAP);
+ }
+
@Private
public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
+ return compressByteArrayToByteString(inBytes, newBestCompressionDeflater());
+ }
+
+ @Private
+ public static ByteString compressByteArrayToByteString(byte[] inBytes, Deflater deflater) throws IOException {
+ deflater.reset();
ByteString.Output os = ByteString.newOutput();
DeflaterOutputStream compressOs = null;
try {
- compressOs = new DeflaterOutputStream(os, new Deflater(
- Deflater.BEST_COMPRESSION));
+ compressOs = new DeflaterOutputStream(os, deflater);
compressOs.write(inBytes);
compressOs.finish();
ByteString byteString = os.toByteString();
@@ -365,9 +388,14 @@ public class TezCommonUtils {
@Private
public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
- InflaterInputStream in = new InflaterInputStream(byteString.newInput());
- byte[] bytes = IOUtils.toByteArray(in);
- return bytes;
+ return decompressByteStringToByteArray(byteString, newInflater());
+ }
+
+ @Private
+ public static byte[] decompressByteStringToByteArray(ByteString byteString, Inflater inflater) throws IOException {
+ inflater.reset();
+ return IOUtils.toByteArray(new InflaterInputStream(byteString.newInput(), inflater));
+
}
public static String getCredentialsInfo(Credentials credentials, String identifier) {
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index cefe026..c5d9c0b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.zip.Inflater;
import java.util.Map.Entry;
import javax.annotation.Nullable;
@@ -369,12 +370,12 @@ public class DagTypeConverters {
return builder.build();
}
- public static String getHistoryTextFromProto(TezEntityDescriptorProto proto) {
+ public static String getHistoryTextFromProto(TezEntityDescriptorProto proto, Inflater inflater) {
if (!proto.hasHistoryText()) {
return null;
}
try {
- return new String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText()),
+ return new String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText(), inflater),
"UTF-8");
} catch (IOException e) {
throw new TezUncheckedException(e);
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index dc04f2d..265fce9 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -72,7 +72,7 @@ public class TestDagTypeConverters {
Assert.assertNull(inputDescriptor.getHistoryText());
// Check history text value
- String actualHistoryText = DagTypeConverters.getHistoryTextFromProto(proto);
+ String actualHistoryText = DagTypeConverters.getHistoryTextFromProto(proto, TezCommonUtils.newInflater());
Assert.assertEquals(historytext, actualHistoryText);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index d8d2407..dce9e52 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -28,9 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import java.util.zip.Inflater;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
@@ -186,7 +188,7 @@ public class DAGUtils {
}
public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException {
-
+ final Inflater inflater = TezCommonUtils.newInflater();
final String VERSION_KEY = "version";
final int version = 2;
Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
@@ -208,7 +210,7 @@ public class DAGUtils {
if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
vertexMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
- vertexPlan.getProcessorDescriptor()));
+ vertexPlan.getProcessorDescriptor(), inflater));
}
}
@@ -232,7 +234,7 @@ public class DAGUtils {
if (input.getIODescriptor().hasHistoryText()) {
inputMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
- input.getIODescriptor()));
+ input.getIODescriptor(), inflater));
}
inputsList.add(inputMap);
}
@@ -250,7 +252,7 @@ public class DAGUtils {
if (output.getIODescriptor().hasHistoryText()) {
outputMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
- output.getIODescriptor()));
+ output.getIODescriptor(), inflater));
}
outputsList.add(outputMap);
}
@@ -282,12 +284,12 @@ public class DAGUtils {
if (edgePlan.getEdgeSource().hasHistoryText()) {
edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
- edgePlan.getEdgeSource()));
+ edgePlan.getEdgeSource(), inflater));
}
if (edgePlan.getEdgeDestination().hasHistoryText()) {
edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
- edgePlan.getEdgeDestination()));
+ edgePlan.getEdgeDestination(), inflater));
} // TEZ-2286 this is missing edgemanager descriptor for custom edge
edgesList.add(edgeMap);
}
@@ -319,7 +321,7 @@ public class DAGUtils {
if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
- edgeMergedInputInfo.getMergedInput()));
+ edgeMergedInputInfo.getMergedInput(), inflater));
}
}
edgeMergedInputs.add(edgeMergedInput);
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index 9b88cfd..dc6cd3b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -66,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Inflater;
/**
* Starts scheduling tasks when number of completed source tasks crosses
@@ -104,6 +105,8 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin {
long[] stats; //approximate amount of data to be fetched
Configuration conf;
ShuffleVertexManagerBaseConfig config;
+ // requires synchronized access
+ final Inflater inflater;
/**
* Used when automatic parallelism is enabled
@@ -198,6 +201,7 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin {
public ShuffleVertexManagerBase(VertexManagerPluginContext context) {
super(context);
+ inflater = TezCommonUtils.newInflater();
}
@Override
@@ -336,7 +340,7 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin {
RoaringBitmap partitionStats = new RoaringBitmap();
ByteString compressedPartitionStats = proto.getPartitionStats();
byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
- compressedPartitionStats);
+ compressedPartitionStats, inflater);
NonSyncByteArrayInputStream bin = new NonSyncByteArrayInputStream(rawData);
partitionStats.deserialize(new DataInputStream(bin));
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index d74e447..aa07233 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -29,6 +29,7 @@ import java.text.DecimalFormat;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
+import java.util.zip.Deflater;
import javax.annotation.Nullable;
import javax.crypto.SecretKey;
@@ -278,12 +279,13 @@ public class ShuffleUtils {
* @param finalMergeEnabled
* @param isLastEvent
* @param pathComponent
+ * @param deflater
* @return ByteBuffer
* @throws IOException
*/
static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails,
int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context,
- int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent)
+ int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Deflater deflater)
throws IOException {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
@@ -302,7 +304,7 @@ public class ShuffleUtils {
if (emptyPartitions > 0) {
ByteString emptyPartitionsBytesString =
TezCommonUtils.compressByteArrayToByteString(
- TezUtilsInternal.toByteArray(emptyPartitionDetails));
+ TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater);
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
+ numPhysicalOutputs + ", emptyPartitions=" + emptyPartitions
@@ -339,13 +341,14 @@ public class ShuffleUtils {
* @param context
* @param generateVmEvent whether to generate a vm event or not
* @param isCompositeEvent whether to generate a CompositeDataMovementEvent or a DataMovementEvent
+ * @param deflater
* @throws IOException
*/
public static void generateEventsForNonStartedOutput(List<Event> eventList,
int numPhysicalOutputs,
OutputContext context,
boolean generateVmEvent,
- boolean isCompositeEvent) throws
+ boolean isCompositeEvent, Deflater deflater) throws
IOException {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
@@ -369,7 +372,7 @@ public class ShuffleUtils {
emptyPartitionDetails.set(0, numPhysicalOutputs, true);
ByteString emptyPartitionsBytesString =
TezCommonUtils.compressByteArrayToByteString(
- TezUtilsInternal.toByteArray(emptyPartitionDetails));
+ TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater);
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
payloadBuilder.setRunDuration(0);
DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
@@ -403,7 +406,7 @@ public class ShuffleUtils {
public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
- @Nullable long[] partitionStats, boolean reportDetailedPartitionStats)
+ @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Deflater deflater)
throws IOException {
Preconditions.checkArgument(eventList != null, "EventList can't be null");
@@ -421,11 +424,11 @@ public class ShuffleUtils {
ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs,
spillRecord, context, spillId,
- finalMergeEnabled, isLastEvent, pathComponent);
+ finalMergeEnabled, isLastEvent, pathComponent, deflater);
if (finalMergeEnabled || isLastEvent) {
VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,
- reportDetailedPartitionStats);
+ reportDetailedPartitionStats, deflater);
eventList.add(vmEvent);
}
@@ -435,7 +438,7 @@ public class ShuffleUtils {
}
public static VertexManagerEvent generateVMEvent(OutputContext context,
- long[] sizePerPartition, boolean reportDetailedPartitionStats)
+ long[] sizePerPartition, boolean reportDetailedPartitionStats, Deflater deflater)
throws IOException {
ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
@@ -459,7 +462,7 @@ public class ShuffleUtils {
DataOutputBuffer dout = new DataOutputBuffer();
stats.serialize(dout);
ByteString partitionStatsBytes =
- TezCommonUtils.compressByteArrayToByteString(dout.getData());
+ TezCommonUtils.compressByteArrayToByteString(dout.getData(), deflater);
vmBuilder.setPartitionStats(partitionStatsBytes);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index adc3432..7d9eacf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Inflater;
import com.google.protobuf.ByteString;
@@ -59,6 +60,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
private final int ifileReadAheadLength;
private final boolean useSharedInputs;
private final InputContext inputContext;
+ private final Inflater inflater;
private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
private final AtomicInteger numDmeEvents = new AtomicInteger(0);
@@ -78,6 +80,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
// this currently relies on a user to enable the flag
// expand on idea based on vertex parallelism and num inputs
this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0);
+ this.inflater = TezCommonUtils.newInflater();
}
@Override
@@ -131,7 +134,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
if (shufflePayload.hasEmptyPartitions()) {
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
- .getEmptyPartitions());
+ .getEmptyPartitions(), inflater);
BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
if (emptyPartionsBitSet.get(srcIndex)) {
InputAttemptIdentifier srcAttemptIdentifier =
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 7991485..f6f6da1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Inflater;
import com.google.protobuf.ByteString;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
@@ -47,7 +48,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
private final ShuffleScheduler scheduler;
private final InputContext inputContext;
-
+ private final Inflater inflater;
private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
private final AtomicInteger numDmeEvents = new AtomicInteger(0);
@@ -58,6 +59,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
ShuffleScheduler scheduler) {
this.inputContext = inputContext;
this.scheduler = scheduler;
+ this.inflater = TezCommonUtils.newInflater();
}
@Override
@@ -110,7 +112,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
if (shufflePayload.hasEmptyPartitions()) {
try {
- byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater);
BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
if (emptyPartitionsBitSet.get(partitionId)) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 609e9ff..9b3aadb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.zip.Deflater;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -52,6 +53,7 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -113,6 +115,7 @@ public class PipelinedSorter extends ExternalSorter {
private int bufferIndex = -1;
private final int MIN_BLOCK_SIZE;
private final boolean lazyAllocateMem;
+ private final Deflater deflater;
// TODO Set additional countesr - total bytes written, spills etc.
@@ -224,6 +227,7 @@ public class PipelinedSorter extends ExternalSorter {
valSerializer.open(span.out);
keySerializer.open(span.out);
minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
+ deflater = TezCommonUtils.newBestCompressionDeflater();
}
ByteBuffer allocateSpace() {
@@ -350,7 +354,7 @@ public class PipelinedSorter extends ExternalSorter {
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
- reportDetailedPartitionStats());
+ reportDetailedPartitionStats(), deflater);
outputContext.sendEvents(events);
LOG.info(outputContext.getDestinationVertexName() +
": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -673,7 +677,7 @@ public class PipelinedSorter extends ExternalSorter {
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, i, indexCacheList.get(i), partitions,
sendEmptyPartitionDetails, pathComponent, partitionStats,
- reportDetailedPartitionStats());
+ reportDetailedPartitionStats(), deflater);
LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
}
outputContext.sendEvents(events);
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 873d8e1..b5c3071 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Deflater;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -41,6 +42,7 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.runtime.api.Event;
@@ -112,6 +114,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
final BlockingBuffer bb = new BlockingBuffer();
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
+ private final Deflater deflater;
final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
@@ -127,6 +130,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
+ deflater = TezCommonUtils.newBestCompressionDeflater();
// sanity checks
final float spillper = this.conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
@@ -1133,7 +1137,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
- partitionStats, reportDetailedPartitionStats());
+ partitionStats, reportDetailedPartitionStats(), deflater);
LOG.info(outputContext.getDestinationVertexName() + ": " +
"Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index eff29a5..0f38a29 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Deflater;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -116,6 +117,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
// uncompressed size for each partition
private final long[] sizePerPartition;
private volatile long spilledSize = 0;
+ private final Deflater deflater;
/**
* Represents final number of records written (spills are not counted)
@@ -158,6 +160,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
super(outputContext, conf, numOutputs);
Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes");
+ this.deflater = TezCommonUtils.newBestCompressionDeflater();
this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
//Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
// this case. Add it later if needed.
@@ -594,7 +597,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
private Event generateVMEvent() throws IOException {
return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition,
- this.reportDetailedPartitionStats());
+ this.reportDetailedPartitionStats(), deflater);
}
private Event generateDMEvent() throws IOException {
@@ -614,7 +617,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
if (emptyPartitions.cardinality() != 0) {
// Empty partitions exist
ByteString emptyPartitionsByteString =
- TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions));
+ TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions), deflater);
payloadBuilder.setEmptyPartitions(emptyPartitionsByteString);
}
@@ -658,7 +661,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
List<Event> eventList = Lists.newLinkedList();
eventList.add(ShuffleUtils.generateVMEvent(outputContext,
reportPartitionStats() ? new long[numPartitions] : null,
- reportDetailedPartitionStats()));
+ reportDetailedPartitionStats(), deflater));
//Send final event with all empty partitions and null path component.
BitSet emptyPartitions = new BitSet(numPartitions);
emptyPartitions.flip(0, numPartitions);
@@ -979,7 +982,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
if (isFinalUpdate) {
eventList.add(ShuffleUtils.generateVMEvent(outputContext,
- sizePerPartition, reportDetailedPartitionStats()));
+ sizePerPartition, reportDetailedPartitionStats(), deflater));
}
Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
pathComponent, emptyPartitions);
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 9a3d778..5f6a304 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
@@ -69,6 +71,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
private long startTime;
private long endTime;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private final Deflater deflater;
@VisibleForTesting
boolean pipelinedShuffle;
@@ -78,6 +81,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
public OrderedPartitionedKVOutput(OutputContext outputContext, int numPhysicalOutputs) {
super(outputContext, numPhysicalOutputs);
+ deflater = TezCommonUtils.newBestCompressionDeflater();
}
@Override
@@ -200,14 +204,14 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
- sorter.getPartitionStats(), sorter.reportDetailedPartitionStats());
+ sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), deflater);
}
return eventList;
}
private List<Event> generateEmptyEvents() throws IOException {
List<Event> eventList = Lists.newLinkedList();
- ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true);
+ ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true, deflater);
return eventList;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 4f74f7d..cc7b27c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.TezConfiguration;
@@ -133,7 +134,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
returnEvents = new LinkedList<Event>();
ShuffleUtils
.generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(),
- false, false);
+ false, false, TezCommonUtils.newBestCompressionDeflater());
}
// This works for non-started outputs since new counters will be created with an initial value of 0
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index c4b3b22..3d16181 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.TezConfiguration;
@@ -110,7 +111,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
returnEvents = new LinkedList<Event>();
ShuffleUtils
.generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(),
- false, true);
+ false, true, TezCommonUtils.newBestCompressionDeflater());
}
// This works for non-started outputs since new counters will be created with an initial value of 0
http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 4233f5d..496468b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -163,7 +163,7 @@ public class TestShuffleUtils {
String pathComponent = "/attempt_x_y_0/file.out";
ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
outputContext, spillId, new TezSpillRecord(indexFile, conf),
- physicalOutputs, true, pathComponent, null, false);
+ physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater());
Assert.assertTrue(events.size() == 1);
Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -202,7 +202,7 @@ public class TestShuffleUtils {
//normal code path where we do final merge all the time
ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
outputContext, spillId, new TezSpillRecord(indexFile, conf),
- physicalOutputs, true, pathComponent, null, false);
+ physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater());
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -243,7 +243,7 @@ public class TestShuffleUtils {
//normal code path where we do final merge all the time
ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
outputContext, spillId, new TezSpillRecord(indexFile, conf),
- physicalOutputs, true, pathComponent, null, false);
+ physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater());
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);