You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/08/27 14:30:42 UTC
[tez] branch branch-0.9 updated: TEZ-3645: Reuse
SerializationFactory while sorting, merging,
and writing IFiles (Jonathan Turner Eagles reviewed by Rajesh Balamohan,
Laszlo Bodor)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 27bffa3 TEZ-3645: Reuse SerializationFactory while sorting, merging, and writing IFiles (Jonathan Turner Eagles reviewed by Rajesh Balamohan, Laszlo Bodor)
27bffa3 is described below
commit 27bffa3dc686193b23c724f614c7ec1a9cb93d4f
Author: Jonathan Turner Eagles <je...@apache.org>
AuthorDate: Thu Aug 27 14:57:59 2020 +0200
TEZ-3645: Reuse SerializationFactory while sorting, merging, and writing IFiles (Jonathan Turner Eagles reviewed by Rajesh Balamohan, Laszlo Bodor)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../common/serializer/SerializationContext.java | 87 ++++++++++++++++++++++
.../shuffle/orderedgrouped/MergeManager.java | 72 +++++++++---------
.../library/common/sort/impl/ExternalSorter.java | 19 ++---
.../runtime/library/common/sort/impl/IFile.java | 28 ++++---
.../library/common/sort/impl/PipelinedSorter.java | 28 ++++---
.../library/common/sort/impl/TezMerger.java | 50 ++++++-------
.../common/sort/impl/dflt/DefaultSorter.java | 34 +++++----
.../writers/BaseUnorderedPartitionedKVWriter.java | 9 ++-
.../writers/UnorderedPartitionedKVWriter.java | 8 +-
.../runtime/library/common/TestValuesIterator.java | 57 +++++++-------
.../common/readers/TestUnorderedKVReader.java | 5 +-
.../shuffle/orderedgrouped/TestMergeManager.java | 16 ++--
.../library/common/sort/impl/TestIFile.java | 35 ++++-----
.../library/common/sort/impl/TestTezMerger.java | 38 +++++-----
14 files changed, 298 insertions(+), 188 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/SerializationContext.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/SerializationContext.java
new file mode 100644
index 0000000..2398b8f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/SerializationContext.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.serializer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+
+/**
+ * SerializationContext is a wrapper class for serialization related fields.
+ */
+public class SerializationContext {
+
+ private Class<?> keyClass;
+ private Class<?> valueClass;
+ private Serialization<?> keySerialization;
+ private Serialization<?> valSerialization;
+
+ public SerializationContext(Configuration conf) {
+ this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valueClass = ConfigUtils.getIntermediateInputValueClass(conf);
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ if (keyClass != null) {
+ this.keySerialization = serializationFactory.getSerialization(keyClass);
+ }
+ if (valueClass != null) {
+ this.valSerialization = serializationFactory.getSerialization(valueClass);
+ }
+ }
+
+ public SerializationContext(Class<?> keyClass, Class<?> valueClass,
+ Serialization<?> keySerialization, Serialization<?> valSerialization) {
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ this.keySerialization = keySerialization;
+ this.valSerialization = valSerialization;
+ }
+
+ public Class<?> getKeyClass() {
+ return keyClass;
+ }
+
+ public Class<?> getValueClass() {
+ return valueClass;
+ }
+
+ public Serialization<?> getKeySerialization() {
+ return keySerialization;
+ }
+
+ public Serialization<?> getValSerialization() {
+ return valSerialization;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Serializer<?> getKeySerializer() {
+ return keySerialization.getSerializer((Class) keyClass);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Serializer<?> getValueSerializer() {
+ return valSerialization.getSerializer((Class) valueClass);
+ }
+
+ public void applyToConf(Configuration conf) {
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClass.getName());
+ }
+}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 2e5cc20..78ce3e8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.FileChunk;
import org.apache.hadoop.io.RawComparator;
@@ -47,6 +46,7 @@ import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
@@ -158,6 +158,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
private final boolean cleanup;
+ private SerializationContext serializationContext;
+
/**
* Construct the MergeManager. Must call start before it becomes usable.
*/
@@ -297,21 +299,21 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
+ ", mergeThreshold: " + this.mergeThreshold);
}
- boolean allowMemToMemMerge =
- conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
+ boolean allowMemToMemMerge =
+ conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
- if (allowMemToMemMerge) {
- this.memToMemMerger =
- new IntermediateMemoryToMemoryMerger(this,
- memToMemMergeOutputsThreshold);
- } else {
- this.memToMemMerger = null;
- }
-
- this.inMemoryMerger = new InMemoryMerger(this);
-
- this.onDiskMerger = new OnDiskMerger(this);
+ if (allowMemToMemMerge) {
+ this.memToMemMerger =
+ new IntermediateMemoryToMemoryMerger(this, memToMemMergeOutputsThreshold);
+ } else {
+ this.memToMemMerger = null;
+ }
+
+ this.inMemoryMerger = new InMemoryMerger(this);
+
+ this.onDiskMerger = new OnDiskMerger(this);
+
+ this.serializationContext = new SerializationContext(conf);
}
@Private
@@ -791,8 +793,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
// TODO Is this doing any combination ?
TezRawKeyValueIterator rIter =
TezMerger.merge(conf, rfs,
- ConfigUtils.getIntermediateInputKeyClass(conf),
- ConfigUtils.getIntermediateInputValueClass(conf),
+ serializationContext,
inMemorySegments, inMemorySegments.size(),
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
@@ -873,10 +874,10 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
long outFileLen = 0;
try {
writer =
- new Writer(conf, rfs, outputPath,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, null, null);
+ new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), rfs, outputPath,
+ serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+ null, null);
TezRawKeyValueIterator rIter = null;
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
@@ -885,8 +886,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
tmpDir = new Path(inputContext.getUniqueIdentifier());
// Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
rIter = TezMerger.merge(conf, rfs,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+ serializationContext,
inMemorySegments, inMemorySegments.size(),
tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
progressable, spilledRecordsCounter, null, additionalBytesRead, null);
@@ -1014,16 +1014,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX + mergeFileSequenceId.getAndIncrement());
- Writer writer =
- new Writer(conf, rfs, outputPath,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, null, null);
+ Writer writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), rfs, outputPath,
+ serializationContext.getKeyClass(), serializationContext.getValueClass(), codec, null,
+ null);
tmpDir = new Path(inputContext.getUniqueIdentifier());
try {
TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+ serializationContext,
inputSegments,
ioSortFactor, tmpDir,
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
@@ -1152,8 +1150,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
inputContext.notifyProgress();
// merge config params
- Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
- Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
+ SerializationContext serContext = new SerializationContext(job);
final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
final RawComparator comparator =
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
@@ -1185,11 +1182,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
final Path outputPath =
mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
- final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
+ final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, serContext,
memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable,
spilledRecordsCounter, null, additionalBytesRead, null);
- final Writer writer = new Writer(job, fs, outputPath,
- keyClass, valueClass, codec, null, null);
+ final Writer writer = new Writer(serContext.getKeySerialization(),
+ serContext.getValSerialization(), fs, outputPath, serContext.getKeyClass(),
+ serContext.getValueClass(), codec, null, null);
try {
TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} catch (IOException e) {
@@ -1289,7 +1287,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
TezRawKeyValueIterator diskMerge = TezMerger.merge(
- job, fs, keyClass, valueClass, codec, diskSegments,
+ job, fs, serContext, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
progressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
diskSegments.clear();
@@ -1303,7 +1301,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
LOG.info(finalMergeLog.toString());
}
// This is doing nothing but creating an iterator over the segments.
- return TezMerger.merge(job, fs, keyClass, valueClass,
+ return TezMerger.merge(job, fs, serContext,
finalSegments, finalSegments.size(), tmpDir,
comparator, progressable, spilledRecordsCounter, null,
additionalBytesRead, null);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 6fb1d94..715d0e0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progressable;
@@ -57,13 +56,14 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitio
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.common.Preconditions;
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"rawtypes"})
public abstract class ExternalSorter {
private static final Logger LOG = LoggerFactory.getLogger(ExternalSorter.class);
@@ -102,10 +102,9 @@ public abstract class ExternalSorter {
protected final FileSystem rfs;
protected final TezTaskOutput mapOutputFile;
protected final int partitions;
- protected final Class keyClass;
- protected final Class valClass;
protected final RawComparator comparator;
- protected final SerializationFactory serializationFactory;
+
+ protected final SerializationContext serializationContext;
protected final Serializer keySerializer;
protected final Serializer valSerializer;
@@ -197,14 +196,12 @@ public abstract class ExternalSorter {
comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
// k/v serialization
- keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
- valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
- serializationFactory = new SerializationFactory(this.conf);
- keySerializer = serializationFactory.getSerializer(keyClass);
- valSerializer = serializationFactory.getSerializer(valClass);
+ this.serializationContext = new SerializationContext(this.conf);
+ keySerializer = serializationContext.getKeySerializer();
+ valSerializer = serializationContext.getValueSerializer();
LOG.info(outputContext.getDestinationVertexName() + " using: "
+ "memoryMb=" + assignedMb
- + ", keySerializerClass=" + keyClass
+ + ", keySerializerClass=" + serializationContext.getKeyClass()
+ ", valueSerializerClass=" + valSerializer
+ ", comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf)
+ ", partitioner=" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 52a9202..69b524b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.tez.common.counters.TezCounter;
@@ -120,12 +120,12 @@ public class IFile {
protected final boolean rle;
- public Writer(Configuration conf, FileSystem fs, Path file,
+ public Writer(Serialization keySerialization, Serialization valSerialization, FileSystem fs, Path file,
Class keyClass, Class valueClass,
CompressionCodec codec,
TezCounter writesCounter,
TezCounter serializedBytesCounter) throws IOException {
- this(conf, fs.create(file), keyClass, valueClass, codec,
+ this(keySerialization, valSerialization, fs.create(file), keyClass, valueClass, codec,
writesCounter, serializedBytesCounter);
ownOutputStream = true;
}
@@ -136,17 +136,17 @@ public class IFile {
this.rle = rle;
}
- public Writer(Configuration conf, FSDataOutputStream outputStream,
+ public Writer(Serialization keySerialization, Serialization valSerialization, FSDataOutputStream outputStream,
Class keyClass, Class valueClass, CompressionCodec codec, TezCounter writesCounter,
TezCounter serializedBytesCounter) throws IOException {
- this(conf, outputStream, keyClass, valueClass, codec, writesCounter,
+ this(keySerialization, valSerialization, outputStream, keyClass, valueClass, codec, writesCounter,
serializedBytesCounter, false);
}
- public Writer(Configuration conf, FSDataOutputStream outputStream,
- Class keyClass, Class valueClass,
- CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter,
- boolean rle) throws IOException {
+ public Writer(Serialization keySerialization, Serialization valSerialization, FSDataOutputStream outputStream,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter,
+ boolean rle) throws IOException {
this.rawOut = outputStream;
this.writtenRecordsCounter = writesCounter;
this.serializedUncompressedBytes = serializedBytesCounter;
@@ -171,19 +171,17 @@ public class IFile {
if (keyClass != null) {
this.closeSerializers = true;
- SerializationFactory serializationFactory =
- new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
+ this.keySerializer = keySerialization.getSerializer(keyClass);
this.keySerializer.open(buffer);
- this.valueSerializer = serializationFactory.getSerializer(valueClass);
+ this.valueSerializer = valSerialization.getSerializer(valueClass);
this.valueSerializer.open(buffer);
} else {
this.closeSerializers = false;
}
}
- public Writer(Configuration conf, FileSystem fs, Path file) throws IOException {
- this(conf, fs, file, null, null, null, null, null);
+ public Writer(Serialization keySerialization, Serialization valSerialization, FileSystem fs, Path file) throws IOException {
+ this(keySerialization, valSerialization, fs, file, null, null, null, null, null);
}
protected void writeHeader(OutputStream outputStream) throws IOException {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 53d087d..c0d57bb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -387,14 +387,14 @@ public class PipelinedSorter extends ExternalSorter {
*/
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
- if (key.getClass() != keyClass) {
+ if (key.getClass() != serializationContext.getKeyClass()) {
throw new IOException("Type mismatch in key from map: expected "
- + keyClass.getName() + ", received "
+ + serializationContext.getKeyClass().getName() + ", received "
+ key.getClass().getName());
}
- if (value.getClass() != valClass) {
+ if (value.getClass() != serializationContext.getValueClass()) {
throw new IOException("Type mismatch in value from map: expected "
- + valClass.getName() + ", received "
+ + serializationContext.getValueClass().getName() + ", received "
+ value.getClass().getName());
}
if (partition < 0 || partition >= partitions) {
@@ -497,8 +497,9 @@ public class PipelinedSorter extends ExternalSorter {
try {
long segmentStart = out.getPos();
if (!sendEmptyPartitionDetails || (i == partition)) {
- writer = new Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter, null, false);
+ writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+ serializationContext.getValueClass(), codec, spilledRecordsCounter, null, false);
}
// we need not check for combiner since its a single record
if (i == partition) {
@@ -585,8 +586,10 @@ public class PipelinedSorter extends ExternalSorter {
Writer writer = null;
boolean hasNext = kvIter.hasNext();
if (hasNext || !sendEmptyPartitionDetails) {
- writer = new Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter, null, merger.needsRLE());
+ writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+ serializationContext.getValueClass(), codec, spilledRecordsCounter, null,
+ merger.needsRLE());
}
if (combiner == null) {
while (kvIter.next()) {
@@ -789,7 +792,7 @@ public class PipelinedSorter extends ExternalSorter {
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
- keyClass, valClass, codec,
+ serializationContext, codec,
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
@@ -801,9 +804,10 @@ public class PipelinedSorter extends ExternalSorter {
long rawLength = 0;
long partLength = 0;
if (shouldWrite) {
- Writer writer =
- new Writer(conf, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter, null, merger.needsRLE());
+ Writer writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), finalOut,
+ serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+ spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer, progressable,
TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 3e04e74..5952c81 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -47,6 +47,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader.KeyState;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
@@ -70,7 +71,7 @@ public class TezMerger {
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
+ SerializationContext serializationContext,
CompressionCodec codec, boolean ifileReadAhead,
int ifileReadAheadLength, int ifileBufferSize,
Path[] inputs, boolean deleteInputs,
@@ -84,7 +85,7 @@ public class TezMerger {
return
new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false, comparator,
- reporter, null).merge(keyClass, valueClass,
+ reporter, null).merge(serializationContext,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
@@ -94,7 +95,7 @@ public class TezMerger {
// Used by the in-memory merger.
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
+ SerializationContext serializationContext,
List<Segment> segments,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
@@ -104,14 +105,14 @@ public class TezMerger {
Progress mergePhase)
throws IOException, InterruptedException {
// Get rid of this ?
- return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+ return merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
+ SerializationContext serializationContext,
List<Segment> segments,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
@@ -122,15 +123,14 @@ public class TezMerger {
Progress mergePhase)
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, false).merge(keyClass, valueClass,
- mergeFactor, tmpDir,
+ sortSegments, false).merge(serializationContext, mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter, mergePhase);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
+ SerializationContext serializationContext,
CompressionCodec codec,
List<Segment> segments,
int mergeFactor, Path tmpDir,
@@ -144,7 +144,7 @@ public class TezMerger {
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, considerFinalMergeForProgress, checkForSameKeys).
- merge(keyClass, valueClass,
+ merge(serializationContext,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
@@ -153,7 +153,7 @@ public class TezMerger {
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
+ SerializationContext serializationContext,
CompressionCodec codec,
List<Segment> segments,
int mergeFactor, Path tmpDir,
@@ -167,8 +167,7 @@ public class TezMerger {
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, considerFinalMergeForProgress).
- merge(keyClass, valueClass,
- mergeFactor, tmpDir,
+ merge(serializationContext, mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
mergePhase);
@@ -176,7 +175,7 @@ public class TezMerger {
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
+ SerializationContext serializationContext,
CompressionCodec codec,
List<Segment> segments,
int mergeFactor, int inMemSegments, Path tmpDir,
@@ -188,7 +187,7 @@ public class TezMerger {
Progress mergePhase)
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec, false).merge(keyClass, valueClass,
+ sortSegments, codec, false).merge(serializationContext,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
@@ -689,18 +688,18 @@ public class TezMerger {
return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
}
- public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
- int factor, Path tmpDir,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase)
+ public TezRawKeyValueIterator merge(SerializationContext serializationContext,
+ int factor, Path tmpDir,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter bytesReadCounter,
+ Progress mergePhase)
throws IOException, InterruptedException {
- return merge(keyClass, valueClass, factor, 0, tmpDir,
+ return merge(serializationContext, factor, 0, tmpDir,
readsCounter, writesCounter, bytesReadCounter, mergePhase);
}
- TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+ TezRawKeyValueIterator merge(SerializationContext serializationContext,
int factor, int inMem, Path tmpDir,
TezCounter readsCounter,
TezCounter writesCounter,
@@ -851,9 +850,10 @@ public class TezMerger {
// TODO Would it ever make sense to make this an in-memory writer ?
// Merging because of too many disk segments - might fit in memory.
- Writer writer =
- new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
- writesCounter, null);
+ Writer writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), fs, outputFile,
+ serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+ writesCounter, null);
writeFile(this, writer, reporter, recordsBeforeProgress);
writer.close();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 71e8e3f..0260115 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -32,7 +32,6 @@ import java.util.zip.Deflater;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
@@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
@@ -250,14 +250,14 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
- if (key.getClass() != keyClass) {
+ if (key.getClass() != serializationContext.getKeyClass()) {
throw new IOException("Type mismatch in key from map: expected "
- + keyClass.getName() + ", received "
+ + serializationContext.getKeyClass().getName() + ", received "
+ key.getClass().getName());
}
- if (value.getClass() != valClass) {
+ if (value.getClass() != serializationContext.getValueClass()) {
throw new IOException("Type mismatch in value from map: expected "
- + valClass.getName() + ", received "
+ + serializationContext.getValueClass().getName() + ", received "
+ value.getClass().getName());
}
if (partition < 0 || partition >= partitions) {
@@ -909,8 +909,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
long segmentStart = out.getPos();
if (spindex < mend && kvmeta.get(offsetFor(spindex) + PARTITION) == i
|| !sendEmptyPartitionDetails) {
- writer = new Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter, null, rle);
+ writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+ serializationContext.getValueClass(), codec, spilledRecordsCounter, null, rle);
}
if (combiner == null) {
// spill directly
@@ -1018,8 +1019,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
long segmentStart = out.getPos();
// Create a new codec, don't care!
if (!sendEmptyPartitionDetails || (i == partition)) {
- writer = new Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter, null, false);
+ writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+ serializationContext.getValueClass(), codec, spilledRecordsCounter, null, false);
}
if (i == partition) {
final long recordStart = out.getPos();
@@ -1298,7 +1300,10 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
long segmentStart = finalOut.getPos();
if (!sendEmptyPartitionDetails) {
Writer writer =
- new Writer(conf, finalOut, keyClass, valClass, codec, null, null);
+ new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), finalOut,
+ serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+ null, null);
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
@@ -1356,7 +1361,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
- keyClass, valClass, codec,
+ serializationContext, codec,
segmentList, mergeFactor,
new Path(taskIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
@@ -1369,9 +1374,10 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
long rawLength = 0;
long partLength = 0;
if (shouldWrite) {
- Writer writer =
- new Writer(conf, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter, null);
+ Writer writer = new Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), finalOut,
+ serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+ spilledRecordsCounter, null);
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer,
progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 9bf1517..16291da 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.io.serializer.Serialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -57,6 +58,8 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
protected final Serializer keySerializer;
protected final Serializer valSerializer;
protected final SerializationFactory serializationFactory;
+ protected final Serialization keySerialization;
+ protected final Serialization valSerialization;
protected final int numPartitions;
protected final CompressionCodec codec;
protected final TezTaskOutput outputFileHandler;
@@ -119,8 +122,10 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
serializationFactory = new SerializationFactory(this.conf);
- keySerializer = serializationFactory.getSerializer(keyClass);
- valSerializer = serializationFactory.getSerializer(valClass);
+ keySerialization = serializationFactory.getSerialization(keyClass);
+ valSerialization = serializationFactory.getSerialization(valClass);
+ keySerializer = keySerialization.getSerializer(keyClass);
+ valSerializer = valSerialization.getSerializer(valClass);
outputRecordBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 9e87098..5ba1e35 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -280,7 +280,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
finalOutPath = outputFileHandler.getOutputFileForWrite();
finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
skipBuffers = true;
- writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
+ writer = new IFile.Writer(keySerialization, valSerialization, rfs, finalOutPath, keyClass, valClass,
codec, outputRecordsCounter, outputRecordBytesCounter);
if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
@@ -615,7 +615,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
continue;
}
if (writer == null) {
- writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+ writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
}
numRecords += writePartition(buffer.partitionPositions[i], buffer, writer, key, val);
}
@@ -1006,7 +1006,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
continue;
}
- writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+ writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
try {
if (currentBuffer.nextPosition != 0
&& currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) {
@@ -1099,7 +1099,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
spilledRecordsCounter.increment(1);
Writer writer = null;
try {
- writer = new IFile.Writer(conf, out, keyClass, valClass, codec, null, null);
+ writer = new IFile.Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
writer.append(key, value);
outputLargeRecordsCounter.increment(1);
numRecordsPerPartition[i]++;
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index 642f02b..eaded18 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -6,6 +6,7 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
@@ -102,8 +104,7 @@ public class TestValuesIterator {
FileSystem fs;
static final Random rnd = new Random();
- final Class keyClass;
- final Class valClass;
+ private SerializationContext serializationContext;
final RawComparator comparator;
final RawComparator correctComparator;
final boolean expectedTestResult;
@@ -129,20 +130,18 @@ public class TestValuesIterator {
* @param testResult expected result
* @throws IOException
*/
- public TestValuesIterator(String serializationClassName, Class key, Class val,
+ public TestValuesIterator(String serializationClassName, Class<?> key, Class<?> val,
TestWithComparator comparator, TestWithComparator correctComparator, boolean testResult)
throws IOException {
- this.keyClass = key;
- this.valClass = val;
this.comparator = getComparator(comparator);
this.correctComparator =
(correctComparator == null) ? this.comparator : getComparator(correctComparator);
this.expectedTestResult = testResult;
originalData = LinkedListMultimap.create();
- setupConf(serializationClassName);
+ setupConf(key, val, serializationClassName);
}
- private void setupConf(String serializationClassName) throws IOException {
+ private void setupConf(Class<?> key, Class<?> val, String serializationClassName) throws IOException {
mergeFactor = 2;
conf = new Configuration();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, mergeFactor);
@@ -154,6 +153,11 @@ public class TestValuesIterator {
String localDirs = baseDir.toString();
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
fs = FileSystem.getLocal(conf);
+
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ serializationContext = new SerializationContext(key, val,
+ serializationFactory.getSerialization(key), serializationFactory.getSerialization(val));
+ serializationContext.applyToConf(conf);
}
@Before
@@ -231,20 +235,21 @@ public class TestValuesIterator {
streamPaths = new Path[0];
//This will return EmptyIterator
rawKeyValueIterator =
- TezMerger.merge(conf, fs, keyClass, valClass, null,
+ TezMerger.merge(conf, fs, serializationContext, null,
false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
new ProgressReporter(), null, null, null, null);
} else {
List<TezMerger.Segment> segments = Lists.newLinkedList();
//This will return EmptyIterator
rawKeyValueIterator =
- TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+ TezMerger.merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
new GenericCounter("writesCounter", "y1"),
new GenericCounter("bytesReadCounter", "y2"), new Progress());
}
return new ValuesIterator(rawKeyValueIterator, comparator,
- keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
+ serializationContext.getKeyClass(), serializationContext.getValueClass(), conf,
+ (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
(TezCounter) new GenericCounter("inputValueCounter", "y4"));
}
@@ -332,19 +337,20 @@ public class TestValuesIterator {
streamPaths = createFiles();
//Merge all files to get KeyValueIterator
rawKeyValueIterator =
- TezMerger.merge(conf, fs, keyClass, valClass, null,
+ TezMerger.merge(conf, fs, serializationContext, null,
false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
new ProgressReporter(), null, null, null, null);
} else {
List<TezMerger.Segment> segments = createInMemStreams();
rawKeyValueIterator =
- TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+ TezMerger.merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
new GenericCounter("writesCounter", "y1"),
new GenericCounter("bytesReadCounter", "y2"), new Progress());
}
- return new ValuesIterator(rawKeyValueIterator, comparator,
- keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
+ return new ValuesIterator(rawKeyValueIterator, comparator, serializationContext.getKeyClass(),
+ serializationContext.getValueClass(), conf,
+ (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
(TezCounter) new GenericCounter("inputValueCounter", "y4"));
}
@@ -364,19 +370,19 @@ public class TestValuesIterator {
streamPaths = createFiles();
//Merge all files to get KeyValueIterator
rawKeyValueIterator =
- TezMerger.merge(conf, fs, keyClass, valClass, null,
+ TezMerger.merge(conf, fs, serializationContext, null,
false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
new ProgressReporter(), null, null, null, null);
} else {
List<TezMerger.Segment> segments = createInMemStreams();
rawKeyValueIterator =
- TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+ TezMerger.merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
new GenericCounter("writesCounter", "y1"),
new GenericCounter("bytesReadCounter", "y2"), new Progress());
}
- return new ValuesIterator(rawKeyValueIterator, comparator,
- keyClass, valClass, conf, keyCounter, tupleCounter);
+ return new ValuesIterator(rawKeyValueIterator, comparator, serializationContext.getKeyClass(),
+ serializationContext.getValueClass(), conf, keyCounter, tupleCounter);
}
@Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3} {4} {5} {6}]")
@@ -454,7 +460,9 @@ public class TestValuesIterator {
paths[i] = new Path(baseDir, "ifile_" + i + ".out");
FSDataOutputStream out = fs.create(paths[i]);
//write data with RLE
- IFile.Writer writer = new IFile.Writer(conf, out, keyClass, valClass, null, null, null, true);
+ IFile.Writer writer = new IFile.Writer(serializationContext.getKeySerialization(),
+ serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+ serializationContext.getValueClass(), null, null, null, true);
Map<Writable, Writable> data = createData();
for (Map.Entry<Writable, Writable> entry : data.entrySet()) {
@@ -487,9 +495,8 @@ public class TestValuesIterator {
int numberOfStreams = Math.max(2, rnd.nextInt(10));
LOG.info("No of streams : " + numberOfStreams);
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- Serializer keySerializer = serializationFactory.getSerializer(keyClass);
- Serializer valueSerializer = serializationFactory.getSerializer(valClass);
+ Serializer keySerializer = serializationContext.getKeySerializer();
+ Serializer valueSerializer = serializationContext.getValueSerializer();
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
@@ -549,8 +556,8 @@ public class TestValuesIterator {
private Map<Writable, Writable> createData() {
Map<Writable, Writable> map = new TreeMap<Writable, Writable>(comparator);
for (int j = 0; j < Math.max(10, rnd.nextInt(50)); j++) {
- Writable key = createData(keyClass);
- Writable value = createData(valClass);
+ Writable key = createData(serializationContext.getKeyClass());
+ Writable value = createData(serializationContext.getValueClass());
map.put(key, value);
//sortedDataMap.put(key, value);
}
@@ -558,7 +565,7 @@ public class TestValuesIterator {
}
- private Writable createData(Class c) {
+ private Writable createData(Class<?> c) {
if (c.getName().equalsIgnoreCase(BytesWritable.class.getName())) {
return new BytesWritable(new BigInteger(256, rnd).toString().getBytes());
} else if (c.getName().equalsIgnoreCase(IntWritable.class.getName())) {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index 6fef944..42231e9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -131,8 +132,8 @@ public class TestUnorderedKVReader {
private void createIFile(Path path, int recordCount) throws IOException {
FSDataOutputStream out = localFs.create(path);
- IFile.Writer writer =
- new IFile.Writer(defaultConf, out, Text.class, Text.class, null, null, null, true);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ out, Text.class, Text.class, null, null, null, true);
for (int i = 0; i < recordCount; i++) {
writer.append(new Text("Key_" + i), new Text("Value_" + i));
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 9cffcc7..13f090c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -38,6 +38,7 @@ import java.util.UUID;
import com.google.common.collect.Sets;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.WritableSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -630,8 +631,8 @@ public class TestMergeManager {
private byte[] generateDataBySize(Configuration conf, int rawLen, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
- IFile.Writer writer =
- new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ fsdos, IntWritable.class, IntWritable.class, null, null, null);
int i = 0;
while(true) {
writer.append(new IntWritable(i), new IntWritable(i));
@@ -653,8 +654,8 @@ public class TestMergeManager {
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
- IFile.Writer writer =
- new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ fsdos, IntWritable.class, IntWritable.class, null, null, null);
int i = 0;
while(true) {
writer.append(new IntWritable(i), new IntWritable(i));
@@ -676,8 +677,8 @@ public class TestMergeManager {
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
- IFile.Writer writer =
- new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ fsdos, IntWritable.class, IntWritable.class, null, null, null);
for (int i = 0; i < numEntries; ++i) {
writer.append(new IntWritable(i), new IntWritable(i));
}
@@ -1015,7 +1016,8 @@ public class TestMergeManager {
for (int i = 0; i < numPartitions; i++) {
long pos = outStream.getPos();
IFile.Writer writer =
- new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
+ new IFile.Writer(new WritableSerialization(), new WritableSerialization(), outStream,
+ IntWritable.class, IntWritable.class, null, null, null);
for (int j = 0; j < numKeysPerPartition; j++) {
writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
currentKey++;
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 9189f50..0678d04 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -146,7 +147,7 @@ public class TestIFile {
public void testWritingEmptyKeyValues() throws IOException {
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer value = new DataInputBuffer();
- IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath, null, null, null,
+ IFile.Writer writer = new IFile.Writer(null, null, localFs, outputPath, null, null, null,
null, null);
writer.append(key, value);
writer.append(key, value);
@@ -205,7 +206,7 @@ public class TestIFile {
// Check Key length exceeding MAX_BUFFER_SIZE
out = localFs.create(outputPath);
- writer = new IFile.Writer(defaultConf, out,
+ writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(longString, shortString);
writer.close();
@@ -228,7 +229,7 @@ public class TestIFile {
// Check Value length exceeding MAX_BUFFER_SIZE
out = localFs.create(outputPath);
- writer = new IFile.Writer(defaultConf, out,
+ writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(shortString, longString);
writer.close();
@@ -252,7 +253,7 @@ public class TestIFile {
// Check Key length not getting doubled
out = localFs.create(outputPath);
- writer = new IFile.Writer(defaultConf, out,
+ writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(longString, shortString);
writer.close();
@@ -271,7 +272,7 @@ public class TestIFile {
// Check Value length not getting doubled
out = localFs.create(outputPath);
- writer = new IFile.Writer(defaultConf, out,
+ writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(shortString, longString);
writer.close();
@@ -298,7 +299,7 @@ public class TestIFile {
public void testWithRLEMarker() throws IOException {
//Test with append(Object, Object)
FSDataOutputStream out = localFs.create(outputPath);
- IFile.Writer writer = new IFile.Writer(defaultConf, out,
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, true);
Text key = new Text("key0");
@@ -324,7 +325,7 @@ public class TestIFile {
int valueLength = 6;
int pos = 0;
out = localFs.create(outputPath);
- writer = new IFile.Writer(defaultConf, out,
+ writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, true);
BoundedByteArrayOutputStream boundedOut = new BoundedByteArrayOutputStream(1024*1024);
@@ -467,8 +468,8 @@ public class TestIFile {
//Test appendValue feature
public void testAppendValue() throws IOException {
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
- IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
- Text.class, IntWritable.class, codec, null, null);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
Text previousKey = null;
for (KVPair kvp : data) {
@@ -498,8 +499,8 @@ public class TestIFile {
values.add(val);
}
- IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
- Text.class, IntWritable.class, codec, null, null);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
writer.append(data.get(0).getKey(), data.get(0).getvalue()); //write first KV pair
writer.appendValues(values.subList(1, values.size()).iterator()); //add the rest here
@@ -526,8 +527,8 @@ public class TestIFile {
values.add(val);
}
- IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
- Text.class, IntWritable.class, codec, null, null);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
writer.appendKeyValues(data.get(0).getKey(), values.iterator());
Text lastKey = new Text("key3");
@@ -544,8 +545,8 @@ public class TestIFile {
//Test appendValue with DataInputBuffer
public void testAppendValueWithDataInputBuffer() throws IOException {
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
- IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
- Text.class, IntWritable.class, codec, null, null);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
final DataInputBuffer previousKey = new DataInputBuffer();
DataInputBuffer key = new DataInputBuffer();
@@ -796,7 +797,7 @@ public class TestIFile {
private Writer writeTestFile(boolean rle, boolean repeatKeys,
List<KVPair> data, CompressionCodec codec) throws IOException {
FSDataOutputStream out = localFs.create(outputPath);
- IFile.Writer writer = new IFile.Writer(defaultConf, out,
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, rle);
writeTestFile(writer, repeatKeys, data);
out.close();
@@ -829,7 +830,7 @@ public class TestIFile {
private Writer writeTestFileUsingDataBuffer(boolean rle, boolean repeatKeys,
List<KVPair> data, CompressionCodec codec) throws IOException {
FSDataOutputStream out = localFs.create(outputPath);
- IFile.Writer writer = new IFile.Writer(defaultConf, out,
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, rle);
writeTestFileUsingDataBuffer(writer, repeatKeys, data);
out.close();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index af10700..c6574d7 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
@@ -136,7 +138,7 @@ public class TestTezMerger {
private Path createIFileWithTextData(List<String> data) throws IOException {
Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
FSDataOutputStream out = localFs.create(path);
- IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class,
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out, Text.class,
Text.class, null, null, null, true);
for (String key : data) {
writer.append(new Text(key), new Text(key + "_" + System.nanoTime()));
@@ -568,11 +570,12 @@ public class TestTezMerger {
private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc)
throws IOException, InterruptedException {
TezMerger merger = new TezMerger();
- TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
- LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
- true, 4, new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
- new Reporter(), null, null,
- null, new Progress());
+ TezRawKeyValueIterator records = merger.merge(defaultConf, localFs,
+ new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
+ new WritableSerialization()),
+ null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), true, 4,
+ new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+ new Reporter(), null, null, null, new Progress());
return records;
}
@@ -602,12 +605,12 @@ public class TestTezMerger {
private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
//Merge datasets
TezMerger merger = new TezMerger();
- TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
- LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
- true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
- ((rc == null) ? comparator : rc), new Reporter(), null, null,
- null,
- new Progress());
+ TezRawKeyValueIterator records = merger.merge(defaultConf, localFs,
+ new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
+ new WritableSerialization()),
+ null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), true, mergeFactor,
+ new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+ new Reporter(), null, null, null, new Progress());
verifyData(records);
verificationDataSet.clear();
@@ -701,9 +704,10 @@ public class TestTezMerger {
TezMerger.MergeQueue mergeQueue = new TezMerger.MergeQueue(defaultConf, localFs, segmentList,
comparator, new Reporter(), false, false);
- TezRawKeyValueIterator records = mergeQueue.merge(IntWritable.class, LongWritable.class,
- mergeFactor, new Path(workDir, "tmp_"
- + System.nanoTime()), null, null, null, null);
+ TezRawKeyValueIterator records = mergeQueue.merge(
+ new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
+ new WritableSerialization()),
+ mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()), null, null, null, null);
//Verify the merged data is correct
verifyData(records);
@@ -770,8 +774,8 @@ public class TestTezMerger {
Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
FSDataOutputStream out = localFs.create(path);
//create IFile with RLE
- IFile.Writer writer = new IFile.Writer(defaultConf, out, IntWritable.class
- , LongWritable.class, null, null, null, true);
+ IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+ out, IntWritable.class, LongWritable.class, null, null, null, true);
for (Integer key : dataSet.keySet()) {
for (Long value : dataSet.get(key)) {