You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/12 21:27:54 UTC
tez git commit: TEZ-1904. Fix findbugs warnings in
tez-runtime-library module. (sseth)
Repository: tez
Updated Branches:
refs/heads/master c8bae3649 -> fda4c0bdc
TEZ-1904. Fix findbugs warnings in tez-runtime-library module. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fda4c0bd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fda4c0bd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fda4c0bd
Branch: refs/heads/master
Commit: fda4c0bdcfc1ddecf2872326bd1da50dfb1a9fc7
Parents: c8bae36
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 12 12:27:39 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 12 12:27:39 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
tez-runtime-library/findbugs-exclude.xml | 86 ++++++++++++++++++++
.../vertexmanager/InputReadyVertexManager.java | 8 +-
.../vertexmanager/ShuffleVertexManager.java | 9 +-
.../library/api/TezRuntimeConfiguration.java | 2 +-
.../tez/runtime/library/common/Constants.java | 2 +-
.../common/security/SecureShuffleUtils.java | 26 ++----
.../runtime/library/common/shuffle/Fetcher.java | 8 +-
.../library/common/shuffle/ShuffleUtils.java | 2 +-
.../common/shuffle/impl/ShuffleManager.java | 2 +-
.../shuffle/orderedgrouped/InMemoryReader.java | 12 ++-
.../shuffle/orderedgrouped/MergeManager.java | 4 +-
.../common/shuffle/orderedgrouped/Shuffle.java | 25 +++---
.../orderedgrouped/ShuffleScheduler.java | 2 +-
.../runtime/library/common/sort/impl/IFile.java | 18 ++--
.../common/sort/impl/IFileInputStream.java | 7 +-
.../common/sort/impl/PipelinedSorter.java | 23 +++---
.../library/common/sort/impl/TezMerger.java | 20 ++---
.../common/sort/impl/dflt/DefaultSorter.java | 10 +--
.../writers/UnorderedPartitionedKVWriter.java | 2 +-
.../library/input/OrderedGroupedKVInput.java | 4 +-
.../runtime/library/input/UnorderedKVInput.java | 4 +-
.../library/processor/SleepProcessor.java | 6 +-
23 files changed, 178 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e16a2e9..70a827c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1904. Fix findbugs warnings in tez-runtime-library module.
TEZ-1903. Fix findbugs warnings in tez-runtime-internal module.
TEZ-1896. Move the default heartbeat timeout and checkinterval to TezConfiguration.
TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure.
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 5b11308..45c194c 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -13,4 +13,90 @@
-->
<FindBugsFilter>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler$Penalty"/>
+ <Method name="compareTo" params="java.lang.Object" returns="int"/>
+ <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+ </Match>
+
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SpanIterator"/>
+ <Method name="compareTo" params="org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SpanIterator" returns="int"/>
+ <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+ </Match>
+
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.comparator.TezBytesComparator"/>
+ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/>
+ </Match>
+
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput$MapOutputComparator"/>
+ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/>
+ </Match>
+
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader"/>
+ <Method name="<init>"/>
+ <Field name="buffer"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.Fetcher"/>
+ <Method name="shutdownInternal" params="boolean" returns="void"/><Field name="isShutDown"/>
+ <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
+ </Match>
+
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.comparator.TezBytesComparator"/>
+ <Method name="getProxy" params="org.apache.hadoop.io.BytesWritable" returns="int"/>
+ <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
+ <Method name="<init>"/>
+ <Bug pattern="SC_START_IN_CTOR"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.Constants"/>
+ <Field name="MERGED_OUTPUT_PREFIX"/>
+ <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+ </Match>
+
+ <!-- TODO This needs more looking into -->
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
+ <Field name="kvindex"/>
+ <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/>
+ <Field name="PARSER"/>
+ <Bug pattern="MS_SHOULD_BE_FINAL"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/>
+ <Field name="unknownFields"/>
+ <Bug pattern="SE_BAD_FIELD"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto\$Builder"/>
+ <Method name="maybeForceBuilderInitialization"/>
+ <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+ </Match>
+
+
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 8f30276..11185ee 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -53,7 +53,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
super(context);
}
- class SourceVertexInfo {
+ static class SourceVertexInfo {
EdgeProperty edgeProperty;
int numTasks;
int numFinishedTasks;
@@ -142,7 +142,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
if (srcInfo.taskIsFinished[taskId.intValue()] == null) {
// not a duplicate completion
- srcInfo.taskIsFinished[taskId.intValue()] = new Boolean(true);
+ srcInfo.taskIsFinished[taskId.intValue()] = Boolean.valueOf(true);
srcInfo.numFinishedTasks++;
if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.ONE_TO_ONE) {
oneToOneSrcTasksDoneCount[taskId.intValue()]++;
@@ -181,7 +181,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
tasksToStart = Lists.newArrayListWithCapacity(numTasks);
for (int i=0; i<numTasks; ++i) {
taskIsStarted[i] = true;
- tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+ tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), null));
}
} else {
// start only the ready 1-1 tasks
@@ -196,7 +196,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
LOG.info("Starting task " + i + " for vertex: "
+ getContext().getVertexName() + " with location: "
+ ((locationHint != null) ? locationHint.getAffinitizedTask() : "null"));
- tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint));
+ tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), locationHint));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 05f94c5..a993449 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -142,7 +142,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
int bipartiteSources = 0;
long completedSourceTasksOutputSize = 0;
- class SourceVertexInfo {
+ static class SourceVertexInfo {
EdgeProperty edgeProperty;
boolean vertexIsConfigured;
BitSet finishedTaskSet;
@@ -227,9 +227,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
int targetIndex =
sourceTaskIndex * partitionRange
+ sourceIndex % partitionRange;
-
- destinationTaskAndInputIndices.put(new Integer(destinationTaskIndex),
- Collections.singletonList(new Integer(targetIndex)));
+
+ destinationTaskAndInputIndices.put(destinationTaskIndex, Collections.singletonList(targetIndex));
}
@Override
@@ -394,7 +393,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
void updatePendingTasks() {
pendingTasks.clear();
for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
- pendingTasks.add(new Integer(i));
+ pendingTasks.add(i);
}
totalTasksToSchedule = pendingTasks.size();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 3c0f11c..11b33f4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -94,7 +94,7 @@ public class TezRuntimeConfiguration {
public static final String TEZ_RUNTIME_SORT_SPILL_PERCENT = TEZ_RUNTIME_PREFIX +
"sort.spill.percent";
- public static float TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT = 0.8f;
+ public static final float TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT = 0.8f;
public static final String TEZ_RUNTIME_IO_SORT_MB = TEZ_RUNTIME_PREFIX + "io.sort.mb";
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
index 2a5c795..d56d86c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -31,7 +31,7 @@ public class Constants {
public static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
- public static String MERGED_OUTPUT_PREFIX = ".merged";
+ public static final String MERGED_OUTPUT_PREFIX = ".merged";
public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
index b1424c0..f36d41d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -18,13 +18,12 @@
package org.apache.tez.runtime.library.common.security;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.PrintStream;
import java.net.URL;
import javax.crypto.SecretKey;
+import com.google.common.base.Charsets;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -47,7 +46,7 @@ public class SecureShuffleUtils {
* @param msg
*/
public static String generateHash(byte[] msg, SecretKey key) {
- return new String(Base64.encodeBase64(generateByteHash(msg, key)));
+ return new String(Base64.encodeBase64(generateByteHash(msg, key)), Charsets.UTF_8);
}
/**
@@ -80,7 +79,7 @@ public class SecureShuffleUtils {
*/
public static String hashFromString(String enc_str, JobTokenSecretManager mgr)
throws IOException {
- return new String(Base64.encodeBase64(mgr.computeHash(enc_str.getBytes())));
+ return new String(Base64.encodeBase64(mgr.computeHash(enc_str.getBytes(Charsets.UTF_8))), Charsets.UTF_8);
}
/**
@@ -91,9 +90,9 @@ public class SecureShuffleUtils {
*/
public static void verifyReply(String base64Hash, String msg, JobTokenSecretManager mgr)
throws IOException {
- byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+ byte[] hash = Base64.decodeBase64(base64Hash.getBytes(Charsets.UTF_8));
- boolean res = verifyHash(hash, msg.getBytes(), mgr);
+ boolean res = verifyHash(hash, msg.getBytes(Charsets.UTF_8), mgr);
if(res != true) {
throw new IOException("Verification of the hashReply failed");
@@ -118,19 +117,4 @@ public class SecureShuffleUtils {
private static String buildMsgFrom(String uri_path, String uri_query, int port) {
return String.valueOf(port) + uri_path + "?" + uri_query;
}
-
-
- /**
- * byte array to Hex String
- * @param ba
- * @return string with HEX value of the key
- */
- public static String toHex(byte[] ba) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
- for(byte b: ba) {
- ps.printf("%x", b);
- }
- return baos.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index e600f18..68951d5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -271,10 +271,8 @@ public class Fetcher implements Callable<FetchResult> {
.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
if (!renamed) {
localFs.delete(tmpIndex, false);
- if (outputPath != null) {
- // invariant: outputPath was renamed from tmpPath
- localFs.delete(outputPath, false);
- }
+ // invariant: outputPath was renamed from tmpPath
+ localFs.delete(outputPath, false);
LOG.warn("Could not rename the index file to "
+ outputPath
.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
@@ -505,7 +503,7 @@ public class Fetcher implements Callable<FetchResult> {
if (isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
LOG.info("Fetcher already shutdown. Not reporting fetch failures for: " +
- (failedInputs == null ? 0 : failedInputs.length) + " failed inputs");
+ failedInputs.length + " failed inputs");
failedInputs = null;
}
return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index fb929e5..e01b985 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -48,7 +48,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
public class ShuffleUtils {
private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
- public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+ public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 42013e3..13296c7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -688,7 +688,7 @@ public class ShuffleManager implements FetcherCallback {
* Fake input that is added to the completed input list in case an input does not have any data.
*
*/
- private class NullFetchedInput extends FetchedInput {
+ private static class NullFetchedInput extends FetchedInput {
public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 7ec56ec..75c552e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -83,12 +83,20 @@ public class InMemoryReader extends Reader {
File dumpFile = new File("../output/" + taskAttemptId + ".dump");
System.err.println("Dumping corrupt map-output of " + taskAttemptId +
" to " + dumpFile.getAbsolutePath());
+ FileOutputStream fos = null;
try {
- FileOutputStream fos = new FileOutputStream(dumpFile);
+ fos = new FileOutputStream(dumpFile);
fos.write(buffer, 0, bufferSize);
- fos.close();
} catch (IOException ioe) {
System.err.println("Failed to dump map-output of " + taskAttemptId);
+ } finally {
+ if (fos != null) {
+ try {
+ fos.close();
+ } catch (IOException e) {
+ System.err.println("Failed to dump map-output of " + taskAttemptId);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index ad50bb5..aca7dc0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -696,7 +696,7 @@ public class MergeManager {
final boolean preserve = fileChunk.isLocalFile();
final Path file = fileChunk.getPath();
approxOutputSize += size;
- Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
+ Segment segment = new Segment(rfs, file, offset, size, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, preserve);
inputSegments.add(segment);
}
@@ -924,7 +924,7 @@ public class MergeManager {
final long fileOffset = fileChunk.getOffset();
final boolean preserve = fileChunk.isLocalFile();
- diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
+ diskSegments.add(new Segment(fs, file, fileOffset, fileLength, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, preserve, counter));
}
LOG.info("Merging " + onDisk.length + " files, " +
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index a9701a6..138ecb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.crypto.SecretKey;
@@ -95,7 +96,7 @@ public class Shuffle implements ExceptionReporter {
private final int numFetchers;
private final boolean localDiskFetchEnabled;
- private Throwable throwable = null;
+ private AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
private String throwingThreadName = null;
private final RunShuffleCallable runShuffleCallable;
@@ -260,8 +261,8 @@ public class Shuffle implements ExceptionReporter {
if (isShutDown.get()) {
throw new InputAlreadyClosedException();
}
- if (throwable != null) {
- handleThrowable(throwable);
+ if (throwable.get() != null) {
+ handleThrowable(throwable.get());
}
if (runShuffleFuture == null) {
return false;
@@ -301,8 +302,8 @@ public class Shuffle implements ExceptionReporter {
if (isShutDown.get()) {
throw new InputAlreadyClosedException();
}
- if (throwable != null) {
- handleThrowable(throwable);
+ if (throwable.get() != null) {
+ handleThrowable(throwable.get());
}
return kvIter;
}
@@ -342,10 +343,10 @@ public class Shuffle implements ExceptionReporter {
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
- synchronized (this) {
- if (throwable != null) {
+ synchronized (Shuffle.this) {
+ if (throwable.get() != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
- throwable);
+ throwable.get());
}
}
}
@@ -368,9 +369,9 @@ public class Shuffle implements ExceptionReporter {
// Sanity check
synchronized (Shuffle.this) {
- if (throwable != null) {
+ if (throwable.get() != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
- throwable);
+ throwable.get());
}
}
@@ -450,8 +451,8 @@ public class Shuffle implements ExceptionReporter {
@Private
public synchronized void reportException(Throwable t) {
// RunShuffleCallable onFailure deals with ignoring errors on shutdown.
- if (throwable == null) {
- throwable = t;
+ if (throwable.get() == null) {
+ throwable.set(t);
throwingThreadName = Thread.currentThread().getName();
// Notify the scheduler so that the reporting thread finds the
// exception immediately.
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 52e7334..066b94a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -472,7 +472,7 @@ class ShuffleScheduler {
// This may be removed after TEZ-914
InputAttemptIdentifier id = listItr.next();
if (inputShouldBeConsumed(id)) {
- Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
+ Integer inputNumber = Integer.valueOf(id.getInputIdentifier().getInputIndex());
InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
dedupedList.put(inputNumber, id);
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index bd31151..049bcef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -64,7 +64,7 @@ public class IFile {
public static final int RLE_MARKER = -2; // Repeat same key marker
public static final int V_END_MARKER = -3; // End of values marker
public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
- public static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I',
+ static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I',
(byte) 'F' , (byte) 0};
private static final String INCOMPLETE_READ = "Requested to read %d got %d";
@@ -98,10 +98,9 @@ public class IFile {
IFileOutputStream checksumOut;
- Class keyClass;
- Class valueClass;
- Serializer keySerializer;
- Serializer valueSerializer;
+ boolean closeSerializers = false;
+ Serializer keySerializer = null;
+ Serializer valueSerializer = null;
final DataOutputBuffer buffer = new DataOutputBuffer();
final DataOutputBuffer previous = new DataOutputBuffer();
@@ -165,16 +164,17 @@ public class IFile {
this.out = new FSDataOutputStream(checksumOut,null);
}
writeHeader(outputStream);
- this.keyClass = keyClass;
- this.valueClass = valueClass;
if (keyClass != null) {
+ this.closeSerializers = true;
SerializationFactory serializationFactory =
new SerializationFactory(conf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.keySerializer.open(buffer);
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
+ } else {
+ this.closeSerializers = false;
}
}
@@ -197,7 +197,7 @@ public class IFile {
// When IFile writer is created by BackupStore, we do not have
// Key and Value classes set. So, check before closing the
// serializers
- if (keyClass != null) {
+ if (closeSerializers) {
keySerializer.close();
valueSerializer.close();
}
@@ -506,7 +506,6 @@ public class IFile {
byte keyBytes[] = new byte[0];
long startPos;
- protected boolean isCompressed = false;
/**
* Construct an IFile Reader.
@@ -565,7 +564,6 @@ public class IFile {
TezCounter readsCounter, TezCounter bytesReadCounter,
boolean readAhead, int readAheadLength,
int bufferSize, boolean isCompressed) throws IOException {
- this.isCompressed = isCompressed;
checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/);
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index abfe4ad..e346793 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -23,6 +23,7 @@ import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -275,7 +276,7 @@ public class IFileInputStream extends InputStream {
", len=" + len +
", length=" + length +
", checksumSize=" + checksumSize+
- ", csum=" + csum +
+ ", csum=" + Arrays.toString(csum) +
", sum=" + sum;
LOG.info(mesg);
@@ -298,10 +299,6 @@ public class IFileInputStream extends InputStream {
return result;
}
- public byte[] getChecksum() {
- return csum;
- }
-
void disableChecksumValidation() {
disableChecksumValidation = true;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index ae654b4..2fef83b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -111,7 +111,7 @@ public class PipelinedSorter extends ExternalSorter {
LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
// TODO: configurable setting?
span = new SortSpan(largeBuffer, 1024*1024, 16);
- merger = new SpanMerger(comparator);
+ merger = new SpanMerger();
final int sortThreads =
this.conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS,
@@ -345,7 +345,7 @@ public class PipelinedSorter extends ExternalSorter {
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
- new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
+ new Segment(rfs, spillFilename, indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(i, s);
@@ -405,7 +405,7 @@ public class PipelinedSorter extends ExternalSorter {
int getPartition();
}
- private class BufferStreamWrapper extends OutputStream
+ private static class BufferStreamWrapper extends OutputStream
{
private final ByteBuffer out;
public BufferStreamWrapper(ByteBuffer out) {
@@ -420,7 +420,7 @@ public class PipelinedSorter extends ExternalSorter {
public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
}
- protected class InputByteBuffer extends DataInputBuffer {
+ protected static class InputByteBuffer extends DataInputBuffer {
private byte[] buffer = new byte[256];
private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
private void resize(int length) {
@@ -607,7 +607,7 @@ public class PipelinedSorter extends ExternalSorter {
}
}
- private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
+ private static class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
private int kvindex = -1;
private int maxindex;
private IntBuffer kvmeta;
@@ -617,7 +617,7 @@ public class PipelinedSorter extends ExternalSorter {
private InputByteBuffer value = new InputByteBuffer();
private Progress progress = new Progress();
- private final int minrun = (1 << 4);
+ private static final int minrun = (1 << 4);
public SpanIterator(SortSpan span) {
this.kvmeta = span.kvmeta;
@@ -644,7 +644,7 @@ public class PipelinedSorter extends ExternalSorter {
// caveat: since we use this as a comparable in the merger
if(kvindex == maxindex) return false;
if(kvindex % 100 == 0) {
- progress.set((kvindex-maxindex) / maxindex);
+ progress.set((kvindex-maxindex) / (float)maxindex);
}
kvindex += 1;
return true;
@@ -741,7 +741,7 @@ public class PipelinedSorter extends ExternalSorter {
}
}
- private class SortTask implements Callable<SpanIterator> {
+ private static class SortTask implements Callable<SpanIterator> {
private final SortSpan sortable;
private final IndexedSorter sorter;
private final RawComparator comparator;
@@ -800,7 +800,7 @@ public class PipelinedSorter extends ExternalSorter {
}
}
- private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+ private static class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
public SpanHeap() {
super(256);
}
@@ -814,7 +814,6 @@ public class PipelinedSorter extends ExternalSorter {
}
private class SpanMerger implements PartitionedRawKeyValueIterator {
- private final RawComparator comparator;
InputByteBuffer key = new InputByteBuffer();
InputByteBuffer value = new InputByteBuffer();
int partition;
@@ -827,11 +826,9 @@ public class PipelinedSorter extends ExternalSorter {
private int gallop = 0;
private SpanIterator horse;
private long total = 0;
- private long count = 0;
private long eq = 0;
- public SpanMerger(RawComparator comparator) {
- this.comparator = comparator;
+ public SpanMerger() {
partIter = new PartitionFilter(this);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 3c8e66a..ed9a59d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -224,7 +224,6 @@ public class TezMerger {
Reader reader = null;
final DataInputBuffer key = new DataInputBuffer();
- Configuration conf = null;
FileSystem fs = null;
Path file = null;
boolean preserve = false; // Signifies whether the segment should be kept after a merge is complete. Checked in the close method.
@@ -237,38 +236,37 @@ public class TezMerger {
TezCounter mapOutputsCounter = null;
- public Segment(Configuration conf, FileSystem fs, Path file,
+ public Segment(FileSystem fs, Path file,
CompressionCodec codec, boolean ifileReadAhead,
int ifileReadAheadLength, int bufferSize, boolean preserve)
throws IOException {
- this(conf, fs, file, codec, ifileReadAhead, ifileReadAheadLength,
+ this(fs, file, codec, ifileReadAhead, ifileReadAheadLength,
bufferSize, preserve, null);
}
- public Segment(Configuration conf, FileSystem fs, Path file,
+ public Segment(FileSystem fs, Path file,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLenth,
int bufferSize, boolean preserve, TezCounter mergedMapOutputsCounter)
throws IOException {
- this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec,
+ this(fs, file, 0, fs.getFileStatus(file).getLen(), codec,
ifileReadAhead, ifileReadAheadLenth, bufferSize, preserve,
mergedMapOutputsCounter);
}
- public Segment(Configuration conf, FileSystem fs, Path file,
+ public Segment(FileSystem fs, Path file,
long segmentOffset, long segmentLength,
CompressionCodec codec, boolean ifileReadAhead,
int ifileReadAheadLength, int bufferSize,
boolean preserve) throws IOException {
- this(conf, fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
+ this(fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
ifileReadAheadLength, bufferSize, preserve, null);
}
- public Segment(Configuration conf, FileSystem fs, Path file,
+ public Segment(FileSystem fs, Path file,
long segmentOffset, long segmentLength, CompressionCodec codec,
boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize,
boolean preserve, TezCounter mergedMapOutputsCounter)
throws IOException {
- this.conf = conf;
this.fs = fs;
this.file = file;
this.codec = codec;
@@ -440,7 +438,7 @@ public class TezMerger {
for (Path file : inputs) {
LOG.debug("MergeQ: adding: " + file);
- segments.add(new Segment(conf, fs, file, codec, ifileReadAhead,
+ segments.add(new Segment(fs, file, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize,
!deleteInputs,
(file.toString().endsWith(
@@ -760,7 +758,7 @@ public class TezMerger {
// Add the newly create segment to the list of segments to be merged
Segment tempSegment =
- new Segment(conf, fs, outputFile, codec, ifileReadAhead,
+ new Segment(fs, outputFile, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false);
// Insert new merged segment into the sorted list
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 773eaba..29ffba1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -648,8 +648,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
@Override
public void run() {
spillLock.lock();
- spillThreadRunning = true;
try {
+ spillThreadRunning = true;
while (true) {
spillDone.signal();
while (!spillInProgress) {
@@ -720,7 +720,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
: kvmeta.capacity() + kvstart) / NMETA;
}
- private boolean isRLENeeded() {
+ private synchronized boolean isRLENeeded() {
return (sameKey > (0.1 * totalKeys)) || (sameKey < 0);
}
@@ -1009,10 +1009,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
if (indexCacheList.size() == 0) {
sameVolRename(mapOutputFile.getSpillIndexFile(0),
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
} else {
indexCacheList.get(0).writeToFile(
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
}
return;
}
@@ -1072,7 +1072,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
- new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
+ new Segment(rfs, filename[i], indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(i, s);
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 74095c6..2a1df40 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -413,8 +413,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
public List<Event> close() throws IOException, InterruptedException {
isShutdown.set(true);
spillLock.lock();
- LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
try {
+ LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
while (pendingSpillCount.get() != 0 && spillException == null) {
spillInProgress.await();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index f46f8f7..9031eb0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -243,6 +243,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
@Override
public void handleEvents(List<Event> inputEvents) throws IOException {
+ Shuffle shuffleLocalRef;
synchronized (this) {
if (getNumPhysicalInputs() == 0) {
throw new RuntimeException("No input events expected as numInputs is 0");
@@ -254,8 +255,9 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
pendingEvents.addAll(inputEvents);
return;
}
+ shuffleLocalRef = shuffle;
}
- shuffle.handleEvents(inputEvents);
+ shuffleLocalRef.handleEvents(inputEvents);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 0d02cb3..90e8f0d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -183,6 +183,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
@Override
public void handleEvents(List<Event> inputEvents) throws IOException {
+ ShuffleEventHandler inputEventHandlerLocalRef;
synchronized (this) {
if (getNumPhysicalInputs() == 0) {
throw new RuntimeException("No input events expected as numInputs is 0");
@@ -197,8 +198,9 @@ public class UnorderedKVInput extends AbstractLogicalInput {
pendingEvents.addAll(inputEvents);
return;
}
+ inputEventHandlerLocalRef = inputEventHandler;
}
- inputEventHandler.handleEvents(inputEvents);
+ inputEventHandlerLocalRef.handleEvents(inputEvents);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 540153a..1b9ea11 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -24,6 +24,7 @@ import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -101,7 +102,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
*/
public static class SleepProcessorConfig {
private int timeToSleepMS;
- private final Charset charSet = Charset.forName("UTF-8");
+ private final Charset charSet = Charsets.UTF_8;
public SleepProcessorConfig() {
}
@@ -114,7 +115,8 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
}
public UserPayload toUserPayload() {
- return UserPayload.create(ByteBuffer.wrap(Integer.toString(timeToSleepMS).getBytes()));
+ return UserPayload.create(ByteBuffer.wrap(Integer.toString(timeToSleepMS).getBytes(
+ charSet)));
}
public void fromUserPayload(UserPayload userPayload) throws CharacterCodingException {