You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/25 03:25:37 UTC
[1/2] TEZ-879. Fix potential synchronization issues in runtime
components. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 908512fc3 -> 0bf327c28
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
index d8682f3..8fadd89 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
@@ -36,7 +36,6 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-import com.google.common.base.Preconditions;
/**
* Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
@@ -49,34 +48,29 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
private final Configuration conf;
- private final String uniqueIdentifier;
- private TezTaskOutputFiles fileNameAllocator;
- private LocalDirAllocator localDirAllocator;
+ private final TezTaskOutputFiles fileNameAllocator;
+ private final LocalDirAllocator localDirAllocator;
// Configuration parameters
- private long memoryLimit;
- private long maxSingleShuffleLimit;
+ private final long memoryLimit;
+ private final long maxSingleShuffleLimit;
- private volatile long usedMemory = 0;
+ private final long maxAvailableTaskMemory;
+ private final long initialMemoryAvailable;
- private long maxAvailableTaskMemory;
- private long initialMemoryAvailable =-1l;
+ private volatile long usedMemory = 0;
- public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory) {
+ public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf,
+ long maxTaskAvailableMemory, long memoryAvailable) {
this.conf = conf;
- this.uniqueIdentifier = uniqueIdentifier;
this.maxAvailableTaskMemory = maxTaskAvailableMemory;
- }
-
- @Private
- public void configureAndStart() {
- Preconditions.checkState(initialMemoryAvailable != -1,
- "Initial memory must be configured before starting");
+ this.initialMemoryAvailable = memoryAvailable;
+
this.fileNameAllocator = new TezTaskOutputFiles(conf,
uniqueIdentifier);
this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
+
// Setup configuration
final float maxInMemCopyUse = conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
@@ -86,8 +80,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ maxInMemCopyUse);
}
-
- // Allow unit tests to fix Runtime memory
+
long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
@@ -98,7 +91,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
}
LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
-
+
final float singleShuffleMemoryLimitPercent = conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
@@ -111,10 +104,10 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
- LOG.info("BroadcastInputManager -> " + "MemoryLimit: " +
- this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+ LOG.info("SimpleInputManager -> " + "MemoryLimit: " +
+ this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
}
-
+
@Private
public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
final float maxInMemCopyUse = conf.getFloat(
@@ -130,11 +123,6 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
return memReq;
}
- @Private
- public void setInitialMemoryAvailable(long available) {
- this.initialMemoryAvailable = available;
- }
-
@Override
public synchronized FetchedInput allocate(long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
index 3ec839d..15bd05a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
@@ -53,9 +53,7 @@ public class TestSimpleFetchedInputAllocator {
LOG.info("InMemThreshold: " + inMemThreshold);
SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(UUID.randomUUID().toString(),
- conf, Runtime.getRuntime().maxMemory());
- inputManager.setInitialMemoryAvailable(inMemThreshold);
- inputManager.configureAndStart();
+ conf, Runtime.getRuntime().maxMemory(), inMemThreshold);
long requestSize = (long) (0.4f * inMemThreshold);
long compressedSize = 1l;
[2/2] git commit: TEZ-879. Fix potential synchronization issues in
runtime components. (sseth)
Posted by ss...@apache.org.
TEZ-879. Fix potential synchronization issues in runtime components.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0bf327c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0bf327c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0bf327c2
Branch: refs/heads/master
Commit: 0bf327c288274b0fde677bdfc373e6110f7b86e2
Parents: 908512f
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 24 19:24:59 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 24 19:24:59 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/common/TezUtils.java | 21 +++
.../org/apache/tez/common/TestTezUtils.java | 11 ++
.../runtime/api/impl/TezCountersDelegate.java | 16 +-
.../broadcast/output/FileBasedKVWriter.java | 38 +++--
.../library/common/InputAttemptIdentifier.java | 2 +-
.../common/MemoryUpdateCallbackHandler.java | 43 ++++++
.../library/common/shuffle/impl/Fetcher.java | 3 +-
.../common/shuffle/impl/MergeManager.java | 125 +++++++--------
.../common/shuffle/impl/MergeThread.java | 6 +-
.../library/common/shuffle/impl/Shuffle.java | 57 ++++---
.../common/shuffle/impl/ShuffleScheduler.java | 10 +-
.../common/sort/impl/ExternalSorter.java | 112 +++++++-------
.../common/sort/impl/PipelinedSorter.java | 35 +++--
.../common/sort/impl/dflt/DefaultSorter.java | 23 +--
.../library/input/ShuffledMergedInput.java | 74 ++++-----
.../library/input/ShuffledUnorderedKVInput.java | 152 +++++++++----------
.../library/output/OnFileSortedOutput.java | 56 ++++---
.../shuffle/common/impl/ShuffleManager.java | 100 +++++-------
.../impl/SimpleFetchedInputAllocator.java | 46 +++---
.../impl/TestSimpleFetchedInputAllocator.java | 4 +-
20 files changed, 479 insertions(+), 455 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index c7f2faf..2979d5f 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -27,6 +27,8 @@ import java.io.PrintStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
@@ -36,6 +38,7 @@ import java.util.zip.InflaterInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
@@ -223,6 +226,7 @@ public class TezUtils {
return output;
}
+ @Private
public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
ByteString.Output os = ByteString.newOutput();
DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
@@ -233,12 +237,29 @@ public class TezUtils {
return byteString;
}
+ @Private
public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
InflaterInputStream in = new InflaterInputStream(byteString.newInput());
byte[] bytes = IOUtils.toByteArray(in);
return bytes;
}
+ private static final Pattern pattern = Pattern.compile("\\W");
+ @Private
+ public static final int MAX_VERTEX_NAME_LENGTH = 40;
+
+ @Private
+ public static String cleanVertexName(String vertexName) {
+ return sanitizeString(vertexName).substring(0,
+ vertexName.length() > MAX_VERTEX_NAME_LENGTH ? MAX_VERTEX_NAME_LENGTH : vertexName.length());
+ }
+
+ private static String sanitizeString(String srcString) {
+ Matcher matcher = pattern.matcher(srcString);
+ String res = matcher.replaceAll("_");
+ return res; // Number starts allowed rightnow
+ }
+
public static void updateLoggers(String addend) throws FileNotFoundException {
String containerLogDir = null;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index 2d132a1..ef4efe1 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -49,6 +49,17 @@ public class TestTezUtils {
Assert.assertEquals(conf.size(), 6);
checkConf(conf);
}
+
+ @Test
+ public void testCleanVertexName() {
+ String testString = "special characters & spaces and longer than "
+ + TezUtils.MAX_VERTEX_NAME_LENGTH + " characters";
+ Assert.assertTrue(testString.length() > TezUtils.MAX_VERTEX_NAME_LENGTH);
+ String cleaned = TezUtils.cleanVertexName(testString);
+ Assert.assertTrue(cleaned.length() <= TezUtils.MAX_VERTEX_NAME_LENGTH);
+ Assert.assertFalse(cleaned.contains("\\s+"));
+ Assert.assertTrue(cleaned.matches("\\w+"));
+ }
private Configuration getConf() {
Configuration conf = new Configuration(false);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
index 29beac0..34692c6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.api.impl;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -30,8 +31,8 @@ public class TezCountersDelegate extends TezCounters {
public TezCountersDelegate(TezCounters original, String taskVertexName, String edgeVertexName,
String type) {
this.original = original;
- this.groupModifier = cleanVertexName(taskVertexName) + "_" + type + "_"
- + cleanVertexName(edgeVertexName);
+ this.groupModifier = TezUtils.cleanVertexName(taskVertexName) + "_" + type + "_"
+ + TezUtils.cleanVertexName(edgeVertexName);
}
// Should only be called while setting up Inputs / Outputs - rather than being
@@ -44,15 +45,4 @@ public class TezCountersDelegate extends TezCounters {
String modifiedGroupName = groupName + "_" + this.groupModifier;
return original.findCounter(modifiedGroupName, counterName);
}
-
-
- private static String cleanVertexName(String vertexName) {
- return sanitizeString(vertexName).substring(0,
- vertexName.length() > 40 ? 40 : vertexName.length());
- }
-
- private static String sanitizeString(String srcString) {
- String res = srcString.replaceAll("[^A-za-z0-9_]", "_");
- return res; // Number starts allowed rightnow
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index c5e6cc1..880420a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -54,17 +54,17 @@ public class FileBasedKVWriter implements KeyValueWriter {
private int numRecords = 0;
@SuppressWarnings("rawtypes")
- private Class keyClass;
+ private final Class keyClass;
@SuppressWarnings("rawtypes")
- private Class valClass;
- private CompressionCodec codec;
- private FileSystem rfs;
- private IFile.Writer writer;
+ private final Class valClass;
+ private final CompressionCodec codec;
+ private final FileSystem rfs;
+ private final IFile.Writer writer;
- private Path outputPath;
+ private final Path outputPath;
private Path indexPath;
- private TezTaskOutput ouputFileManager;
+ private final TezTaskOutput ouputFileManager;
private boolean closed = false;
// Number of output key-value pairs
@@ -110,7 +110,16 @@ public class FileBasedKVWriter implements KeyValueWriter {
LOG.info("Created KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
: codec.getClass().getName()));
- initWriter();
+ this.outputPath = ouputFileManager.getOutputFileForWrite();
+ LOG.info("Writing data file: " + outputPath);
+
+ // TODO NEWTEZ Consider making the buffer size configurable. Also consider
+ // setting up an in-memory buffer which is occasionally flushed to disk so
+ // that the output does not block.
+
+ // TODO NEWTEZ maybe use appropriate counter
+ this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
+ codec, null, outputBytesCounter);
}
/**
@@ -141,19 +150,6 @@ public class FileBasedKVWriter implements KeyValueWriter {
this.outputRecordsCounter.increment(1);
numRecords++;
}
-
- public void initWriter() throws IOException {
- this.outputPath = ouputFileManager.getOutputFileForWrite();
- LOG.info("Writing data file: " + outputPath);
-
- // TODO NEWTEZ Consider making the buffer size configurable. Also consider
- // setting up an in-memory buffer which is occasionally flushed to disk so
- // that the output does not block.
-
- // TODO NEWTEZ maybe use appropriate counter
- this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
- codec, null, outputBytesCounter);
- }
public long getRawLength() {
Preconditions.checkState(closed, "Only available after the Writer has been closed");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index 946cb0f..4c9d525 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -31,7 +31,7 @@ public class InputAttemptIdentifier {
private final int attemptNumber;
private String pathComponent;
- public static String PATH_PREFIX = "attempt";
+ public static final String PATH_PREFIX = "attempt";
public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
this(new InputIdentifier(inputIndex), attemptNumber, null);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java
new file mode 100644
index 0000000..b2dedee
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+
+import com.google.common.base.Preconditions;
+
+public class MemoryUpdateCallbackHandler implements MemoryUpdateCallback {
+
+ private long assignedMemory;
+ private boolean updated = false;
+
+ @Override
+ public synchronized void memoryAssigned(long assignedSize) {
+ updated = true;
+ this.assignedMemory = assignedSize;
+ }
+
+ public synchronized long getMemoryAssigned() {
+ return this.assignedMemory;
+ }
+
+ public synchronized void validateUpdateReceived() {
+ Preconditions.checkState(updated == true, "Iniital memory update not received");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index db72e92..0433098 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -141,7 +142,7 @@ class Fetcher extends Thread {
this.bufferSize = job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
- setName("fetcher#" + id);
+ setName("fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id);
setDaemon(true);
synchronized (Fetcher.class) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 50695c8..5a952ba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -40,10 +40,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -60,7 +59,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
-import com.google.common.base.Preconditions;
/**
* Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
@@ -82,28 +80,28 @@ public class MergeManager {
private final Progressable nullProgressable = new NullProgressable();
private final Combiner combiner;
- Set<MapOutput> inMemoryMergedMapOutputs =
+ private final Set<MapOutput> inMemoryMergedMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
- private IntermediateMemoryToMemoryMerger memToMemMerger;
+ private final IntermediateMemoryToMemoryMerger memToMemMerger;
- Set<MapOutput> inMemoryMapOutputs =
+ private final Set<MapOutput> inMemoryMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
- private InMemoryMerger inMemoryMerger;
+ private final InMemoryMerger inMemoryMerger;
- Set<Path> onDiskMapOutputs = new TreeSet<Path>();
- private OnDiskMerger onDiskMerger;
+ private final Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+ private final OnDiskMerger onDiskMerger;
- private long memoryLimit;
- private int postMergeMemLimit;
+ private final long memoryLimit;
+ private final int postMergeMemLimit;
private long usedMemory;
private long commitMemory;
- private int ioSortFactor;
- private long maxSingleShuffleLimit;
+ private final int ioSortFactor;
+ private final long maxSingleShuffleLimit;
- private int memToMemMergeOutputsThreshold;
- private long mergeThreshold;
+ private final int memToMemMergeOutputsThreshold;
+ private final long mergeThreshold;
- private long initialMemoryAvailable = -1;
+ private final long initialMemoryAvailable;
private final ExceptionReporter exceptionReporter;
@@ -120,13 +118,13 @@ public class MergeManager {
private final TezCounter additionalBytesWritten;
private final TezCounter additionalBytesRead;
- private CompressionCodec codec;
+ private final CompressionCodec codec;
private volatile boolean finalMergeComplete = false;
- private boolean ifileReadAhead;
- private int ifileReadAheadLength;
- private int ifileBufferSize;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+ private final int ifileBufferSize;
/**
@@ -140,11 +138,16 @@ public class MergeManager {
TezCounter spilledRecordsCounter,
TezCounter reduceCombineInputCounter,
TezCounter mergedMapOutputsCounter,
- ExceptionReporter exceptionReporter) {
+ ExceptionReporter exceptionReporter,
+ long initialMemoryAvailable,
+ CompressionCodec codec,
+ boolean ifileReadAheadEnabled,
+ int ifileReadAheadLength) {
this.inputContext = inputContext;
this.conf = conf;
this.localDirAllocator = localDirAllocator;
this.exceptionReporter = exceptionReporter;
+ this.initialMemoryAvailable = initialMemoryAvailable;
this.combiner = combiner;
@@ -161,36 +164,16 @@ public class MergeManager {
this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
- }
-
- void setInitialMemoryAvailable(long available) {
- this.initialMemoryAvailable = available;
- }
-
- @Private
- void configureAndStart() {
- Preconditions.checkState(initialMemoryAvailable != -1,
- "Initial available memory must be configured before starting");
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- codec = null;
- }
- this.ifileReadAhead = conf.getBoolean(
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ this.codec = codec;
+ this.ifileReadAhead = ifileReadAheadEnabled;
if (this.ifileReadAhead) {
- this.ifileReadAheadLength = conf.getInt(
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ this.ifileReadAheadLength = ifileReadAheadLength;
} else {
this.ifileReadAheadLength = 0;
}
this.ifileBufferSize = conf.getInt("io.file.buffer.size",
TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-
+
// Figure out initial memory req start
final float maxInMemCopyUse =
conf.getFloat(
@@ -221,13 +204,13 @@ public class MergeManager {
} else {
this.memoryLimit = memLimit;
}
-
+
if (this.initialMemoryAvailable < maxRedBuffer) {
this.postMergeMemLimit = (int) this.initialMemoryAvailable;
} else {
this.postMergeMemLimit = maxRedBuffer;
}
-
+
LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
+ ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
+ this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
@@ -236,7 +219,7 @@ public class MergeManager {
conf.getInt(
TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR,
TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
-
+
final float singleShuffleMemoryLimitPercent =
conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
@@ -264,34 +247,40 @@ public class MergeManager {
"mergeThreshold=" + mergeThreshold + ", " +
"ioSortFactor=" + ioSortFactor + ", " +
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
+
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
throw new RuntimeException("Invlaid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold"
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ "mergeThreshold: " + this.mergeThreshold);
}
-
+
boolean allowMemToMemMerge =
- conf.getBoolean(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
- if (allowMemToMemMerge) {
- this.memToMemMerger =
- new IntermediateMemoryToMemoryMerger(this,
- memToMemMergeOutputsThreshold);
- this.memToMemMerger.start();
- } else {
- this.memToMemMerger = null;
+ conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+ if (allowMemToMemMerge) {
+ this.memToMemMerger =
+ new IntermediateMemoryToMemoryMerger(this,
+ memToMemMergeOutputsThreshold);
+ } else {
+ this.memToMemMerger = null;
+ }
+
+ this.inMemoryMerger = new InMemoryMerger(this);
+
+ this.onDiskMerger = new OnDiskMerger(this);
+ }
+
+ @Private
+ void configureAndStart() {
+ if (this.memToMemMerger != null) {
+ memToMemMerger.start();
}
-
- this.inMemoryMerger = new InMemoryMerger(this);
this.inMemoryMerger.start();
-
- this.onDiskMerger = new OnDiskMerger(this);
this.onDiskMerger.start();
}
-
+
/**
* Exposing this to get an initial memory ask without instantiating the object.
*/
@@ -492,7 +481,7 @@ public class MergeManager {
public IntermediateMemoryToMemoryMerger(MergeManager manager,
int mergeFactor) {
super(manager, mergeFactor, exceptionReporter);
- setName("MemToMemMerger [" + inputContext.getSourceVertexName() + "]");
+ setName("MemToMemMerger [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
setDaemon(true);
}
@@ -547,7 +536,7 @@ public class MergeManager {
public InMemoryMerger(MergeManager manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName("MemtoDiskMerger [" + inputContext.getSourceVertexName() + "]");
+ setName("MemtoDiskMerger [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
setDaemon(true);
}
@@ -646,7 +635,7 @@ public class MergeManager {
public OnDiskMerger(MergeManager manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName("DiskToDiskMerger [" + inputContext.getSourceVertexName() + "]");
+ setName("DiskToDiskMerger [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
setDaemon(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
index d8a7722..aed2628 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
@@ -31,7 +31,7 @@ abstract class MergeThread<T> extends Thread {
private static final Log LOG = LogFactory.getLog(MergeThread.class);
private volatile boolean inProgress = false;
- private List<T> inputs = new ArrayList<T>();
+ private final List<T> inputs = new ArrayList<T>();
protected final MergeManager manager;
private final ExceptionReporter reporter;
private boolean closed = false;
@@ -56,8 +56,8 @@ abstract class MergeThread<T> extends Thread {
public synchronized void startMerge(Set<T> inputs) {
if (!closed) {
+ this.inputs.clear();
inProgress = true;
- this.inputs = new ArrayList<T>();
Iterator<T> iter=inputs.iterator();
for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
this.inputs.add(iter.next());
@@ -95,7 +95,7 @@ abstract class MergeThread<T> extends Thread {
} finally {
synchronized (this) {
// Clear inputs
- inputs = null;
+ inputs.clear();
inProgress = false;
notifyAll();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 1bc3903..a27fa31 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -37,12 +38,12 @@ import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -58,7 +59,7 @@ import com.google.common.base.Preconditions;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
+public class Shuffle implements ExceptionReporter {
private static final Log LOG = LogFactory.getLog(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000;
@@ -67,35 +68,28 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
private final TezInputContext inputContext;
private final int numInputs;
- private ShuffleClientMetrics metrics;
+ private final ShuffleClientMetrics metrics;
- private ShuffleInputEventHandler eventHandler;
- private ShuffleScheduler scheduler;
- private MergeManager merger;
+ private final ShuffleInputEventHandler eventHandler;
+ private final ShuffleScheduler scheduler;
+ private final MergeManager merger;
+
+ private final SecretKey jobTokenSecret;
+ private final CompressionCodec codec;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+
private Throwable throwable = null;
private String throwingThreadName = null;
-
- private SecretKey jobTokenSecret;
- private CompressionCodec codec;
- private boolean ifileReadAhead;
- private int ifileReadAheadLength;
-
- private volatile long initialMemoryAvailable = -1;
private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
- public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+ public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs,
+ long initialMemoryAvailable) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
this.numInputs = numInputs;
- long initialMemRequested = MergeManager.getInitialMemoryRequirement(conf,
- inputContext.getTotalMemoryAvailableToTask());
- inputContext.requestInitialMemory(initialMemRequested, this);
- }
- private void configureAndStart() throws IOException {
- Preconditions.checkState(initialMemoryAvailable != -1,
- "Initial Available memory must be configured before starting");
this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
@@ -175,9 +169,11 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
- this);
- merger.setInitialMemoryAvailable(initialMemoryAvailable);
- merger.configureAndStart();
+ this,
+ initialMemoryAvailable,
+ codec,
+ ifileReadAhead,
+ ifileReadAheadLength);
}
public void handleEvents(List<Event> events) {
@@ -225,10 +221,11 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
}
public void run() throws IOException {
- configureAndStart();
+ merger.configureAndStart();
RunShuffleCallable runShuffle = new RunShuffleCallable();
runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
- new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
+ new Thread(runShuffleFuture, "ShuffleMergeRunner ["
+ + TezUtils.cleanVertexName(inputContext.getSourceVertexName() + "]")).start();
}
private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
@@ -294,6 +291,7 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
}
}
+ @Private
public synchronized void reportException(Throwable t) {
if (throwable == null) {
throwable = t;
@@ -314,9 +312,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
}
}
- @Override
- public void memoryAssigned(long assignedSize) {
- this.initialMemoryAvailable = assignedSize;
+ @Private
+ public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+ return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index d53c6a6..a5b79fb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
@@ -73,7 +74,7 @@ class ShuffleScheduler {
private final Random random = new Random(System.currentTimeMillis());
private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
- private final Referee referee = new Referee();
+ private final Referee referee;
private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
new HashMap<InputAttemptIdentifier,IntWritable>();
private final Map<String,IntWritable> hostFailures =
@@ -116,6 +117,7 @@ class ShuffleScheduler {
abortFailureLimit = Math.max(30, numberOfInputs / 10);
remainingMaps = numberOfInputs;
finishedMaps = new boolean[remainingMaps]; // default init to false
+ this.referee = new Referee();
this.shuffle = shuffle;
this.shuffledInputsCounter = shuffledInputsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
@@ -159,8 +161,7 @@ class ShuffleScheduler {
long milis,
MapOutput output
) throws IOException {
- String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber());
- failureCounts.remove(taskIdentifier);
+ failureCounts.remove(srcAttemptIdentifier);
if (host != null) {
hostFailures.remove(host.getHostName());
}
@@ -530,7 +531,8 @@ class ShuffleScheduler {
*/
private class Referee extends Thread {
public Referee() {
- setName("ShufflePenaltyReferee");
+ setName("ShufflePenaltyReferee ["
+ + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
setDaemon(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index ab8a869..67e7337 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -56,7 +55,7 @@ import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
import com.google.common.base.Preconditions;
@SuppressWarnings({"unchecked", "rawtypes"})
-public abstract class ExternalSorter implements MemoryUpdateCallback {
+public abstract class ExternalSorter {
private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
@@ -65,77 +64,80 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
public abstract void flush() throws IOException;
public abstract void write(Object key, Object value) throws IOException;
+
+ protected final Progressable nullProgressable = new NullProgressable();
+ protected final TezOutputContext outputContext;
+ protected final Combiner combiner;
+ protected final Partitioner partitioner;
+ protected final Configuration conf;
+ protected final FileSystem rfs;
+ protected final TezTaskOutput mapOutputFile;
+ protected final int partitions;
+ protected final Class keyClass;
+ protected final Class valClass;
+ protected final RawComparator comparator;
+ protected final SerializationFactory serializationFactory;
+ protected final Serializer keySerializer;
+ protected final Serializer valSerializer;
- private int initialMemRequestMb;
- protected Progressable nullProgressable = new NullProgressable();
- protected TezOutputContext outputContext;
- protected Combiner combiner;
- protected Partitioner partitioner;
- protected Configuration conf;
- protected FileSystem rfs;
- protected TezTaskOutput mapOutputFile;
- protected int partitions;
- protected Class keyClass;
- protected Class valClass;
- protected RawComparator comparator;
- protected SerializationFactory serializationFactory;
- protected Serializer keySerializer;
- protected Serializer valSerializer;
-
- protected boolean ifileReadAhead;
- protected int ifileReadAheadLength;
- protected int ifileBufferSize;
+ protected final boolean ifileReadAhead;
+ protected final int ifileReadAheadLength;
+ protected final int ifileBufferSize;
- protected volatile int availableMemoryMb;
+ protected final int availableMemoryMb;
- protected IndexedSorter sorter;
+ protected final IndexedSorter sorter;
// Compression for map-outputs
- protected CompressionCodec codec;
+ protected final CompressionCodec codec;
// Counters
// MR compatilbity layer needs to rename counters back to what MR requries.
// Represents final deserialized size of output (spills are not counted)
- protected TezCounter mapOutputByteCounter;
+ protected final TezCounter mapOutputByteCounter;
// Represents final number of records written (spills are not counted)
- protected TezCounter mapOutputRecordCounter;
+ protected final TezCounter mapOutputRecordCounter;
// Represents the size of the final output - with any overheads introduced by
// the storage/serialization mechanism. This is an uncompressed data size.
- protected TezCounter outputBytesWithOverheadCounter;
+ protected final TezCounter outputBytesWithOverheadCounter;
// Represents the size of the final output - which will be transmitted over
// the wire (spills are not counted). Factors in compression if it is enabled.
- protected TezCounter fileOutputByteCounter;
+ protected final TezCounter fileOutputByteCounter;
// Represents total number of records written to disk (includes spills. Min
// value for this is equal to number of output records)
- protected TezCounter spilledRecordsCounter;
+ protected final TezCounter spilledRecordsCounter;
// Bytes written as a result of additional spills. The single spill for the
// final output data is not considered. (This will be 0 if there's no
// additional spills. Compressed size - so may not represent the size in the
// sort buffer)
- protected TezCounter additionalSpillBytesWritten;
+ protected final TezCounter additionalSpillBytesWritten;
- protected TezCounter additionalSpillBytesRead;
+ protected final TezCounter additionalSpillBytesRead;
// Number of additional spills. (This will be 0 if there's no additional
// spills)
- protected TezCounter numAdditionalSpills;
+ protected final TezCounter numAdditionalSpills;
- @Private
- public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ public ExternalSorter(TezOutputContext outputContext, Configuration conf, int numOutputs,
+ long initialMemoryAvailable) throws IOException {
this.outputContext = outputContext;
this.conf = conf;
this.partitions = numOutputs;
rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
- initialMemRequestMb =
- this.conf.getInt(
- TezJobConfig.TEZ_RUNTIME_IO_SORT_MB,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
- Preconditions.checkArgument(initialMemRequestMb != 0, "io.sort.mb should be larger than 0");
- long reqBytes = initialMemRequestMb << 20;
- outputContext.requestInitialMemory(reqBytes, this);
- LOG.info("Requested SortBufferSize (io.sort.mb): " + initialMemRequestMb);
+ int assignedMb = (int) (initialMemoryAvailable >> 20);
+ if (assignedMb <= 0) {
+ if (initialMemoryAvailable > 0) { // Rounded down to 0MB - may be > 0 && < 1MB
+ this.availableMemoryMb = 1;
+ LOG.warn("initialAvailableMemory: " + initialMemoryAvailable
+ + " is too low. Rounding to 1 MB");
+ } else {
+ throw new RuntimeException("InitialMemoryAssigned is <= 0: " + initialMemoryAvailable);
+ }
+ } else {
+ this.availableMemoryMb = assignedMb;
+ }
// sorter
sorter = ReflectionUtils.newInstance(this.conf.getClass(
@@ -192,13 +194,6 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
}
-
- /**
- * Used to start the actual Output. Typically, this involves allocating
- * buffers, starting required threads, etc
- */
- @Private
- public abstract void start() throws Exception;
/**
* Exception indicating that the allocated sort buffer is insufficient to hold
@@ -253,14 +248,15 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
public ShuffleHeader getShuffleHeader(int reduce) {
throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
}
-
- @Override
- public void memoryAssigned(long assignedSize) {
- this.availableMemoryMb = (int) (assignedSize >> 20);
- if (this.availableMemoryMb == 0) {
- LOG.warn("AssignedMemoryMB: " + this.availableMemoryMb
- + " is too low. Falling back to initial ask: " + initialMemRequestMb);
- this.availableMemoryMb = initialMemRequestMb;
- }
+
+ public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+ int initialMemRequestMb =
+ conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IO_SORT_MB,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+ Preconditions.checkArgument(initialMemRequestMb != 0, "io.sort.mb should be larger than 0");
+ long reqBytes = initialMemRequestMb << 20;
+ LOG.info("Requested SortBufferSize (io.sort.mb): " + initialMemRequestMb);
+ return reqBytes;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index fedbcb5..54a6e8e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -43,10 +44,14 @@ import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
@SuppressWarnings({"unchecked", "rawtypes"})
public class PipelinedSorter extends ExternalSorter {
@@ -58,12 +63,8 @@ public class PipelinedSorter extends ExternalSorter {
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
private final static int APPROX_HEADER_LENGTH = 150;
-
- boolean ifileReadAhead;
- int ifileReadAheadLength;
- int ifileBufferSize;
-
- int partitionBits;
+
+ private final int partitionBits;
private static final int PARTITION = 0; // partition offset in acct
private static final int KEYSTART = 1; // key offset in acct
@@ -76,24 +77,25 @@ public class PipelinedSorter extends ExternalSorter {
volatile Throwable sortSpillException = null;
int numSpills = 0;
- int minSpillsForCombine;
- private HashComparator hasher;
+ private final int minSpillsForCombine;
+ private final HashComparator hasher;
// SortSpans
private SortSpan span;
private ByteBuffer largeBuffer;
// Merger
- private SpanMerger merger;
- private ExecutorService sortmaster;
+ private final SpanMerger merger;
+ private final ExecutorService sortmaster;
- final ArrayList<TezSpillRecord> indexCacheList =
+ private final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
// TODO Set additional countesr - total bytes written, spills etc.
-
- @Override
- public void start() throws IOException {
+
+ public PipelinedSorter(TezOutputContext outputContext, Configuration conf, int numOutputs,
+ long initialMemoryAvailable) throws IOException {
+ super(outputContext, conf, numOutputs, initialMemoryAvailable);
partitionBits = bitcount(partitions)+1;
@@ -126,7 +128,10 @@ public class PipelinedSorter extends ExternalSorter {
this.conf.getInt(
TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS);
- sortmaster = Executors.newFixedThreadPool(sortThreads);
+ sortmaster = Executors.newFixedThreadPool(sortThreads,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Sorter [" + TezUtils.cleanVertexName(outputContext.getDestinationVertexName()) + "] #%d")
+ .build());
// k/v serialization
if(comparator instanceof HashComparator) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 5264554..912bf8b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -39,6 +40,8 @@ import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -64,7 +67,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
private final static int APPROX_HEADER_LENGTH = 150;
// k/v accounting
- IntBuffer kvmeta; // metadata overlay on backing store
+ private final IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata
int kvend; // marks end of spill metadata
int kvindex; // marks end of fully serialized records
@@ -77,7 +80,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
int bufvoid; // marks the point where we should stop
// reading at the end of the buffer
- byte[] kvbuffer; // main output buffer
+ private final byte[] kvbuffer; // main output buffer
private final byte[] b0 = new byte[0];
protected static final int VALSTART = 0; // val offset in acct
@@ -88,14 +91,14 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
protected static final int METASIZE = NMETA * 4; // size in bytes
// spill accounting
- int maxRec;
- int softLimit;
+ final int maxRec;
+ final int softLimit;
boolean spillInProgress;
int bufferRemaining;
volatile Throwable sortSpillException = null;
int numSpills = 0;
- int minSpillsForCombine;
+ final int minSpillsForCombine;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
@@ -105,11 +108,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
+ private final int indexCacheMemoryLimit;
private int totalIndexCacheMemory;
- private int indexCacheMemoryLimit;
- @Override
- public void start() throws IOException {
+ public DefaultSorter(TezOutputContext outputContext, Configuration conf, int numOutputs,
+ long initialMemoryAvailable) throws IOException {
+ super(outputContext, conf, numOutputs, initialMemoryAvailable);
// sanity checks
final float spillper = this.conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
@@ -156,7 +160,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
spillInProgress = false;
minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true);
- spillThread.setName("SpillThread");
+ spillThread.setName("SpillThread ["
+ + TezUtils.cleanVertexName(outputContext.getDestinationVertexName() + "]"));
spillLock.lock();
try {
spillThread.start();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index c55012f..4d62346 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -38,10 +38,13 @@ import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.ValuesIterator;
import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import com.google.common.base.Preconditions;
+
/**
* <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
@@ -63,8 +66,9 @@ public class ShuffledMergedInput implements LogicalInput {
protected Configuration conf;
protected int numInputs = 0;
protected Shuffle shuffle;
+ protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
- private volatile long firstEventReceivedTime = -1;
+ private long firstEventReceivedTime = -1;
@SuppressWarnings("rawtypes")
protected ValuesIterator vIter;
@@ -74,7 +78,7 @@ public class ShuffledMergedInput implements LogicalInput {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@Override
- public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ public synchronized List<Event> initialize(TezInputContext inputContext) throws IOException {
this.inputContext = inputContext;
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
@@ -86,34 +90,38 @@ public class ShuffledMergedInput implements LogicalInput {
+ inputContext.getSourceVertexName());
return Collections.emptyList();
}
+
+ long initialMemoryRequest = Shuffle.getInitialMemoryRequirement(conf,
+ inputContext.getTotalMemoryAvailableToTask());
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ inputContext.requestInitialMemory(initialMemoryRequest, memoryUpdateCallbackHandler);
this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
this.inputValueCounter = inputContext.getCounters().findCounter(
TaskCounter.REDUCE_INPUT_RECORDS);
this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
- shuffle = new Shuffle(inputContext, this.conf, numInputs);
return Collections.emptyList();
}
-
+
@Override
- public void start() throws IOException {
- synchronized (this) {
- if (!isStarted.get()) {
- // Start the shuffle - copy and merge
- shuffle.run();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
- }
- List<Event> pending = new LinkedList<Event>();
- pendingEvents.drainTo(pending);
- if (pending.size() > 0) {
- LOG.info("NoAutoStart delay in processing first event: "
- + (System.currentTimeMillis() - firstEventReceivedTime));
- shuffle.handleEvents(pending);
- }
- isStarted.set(true);
+ public synchronized void start() throws IOException {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ // Start the shuffle - copy and merge
+ shuffle = new Shuffle(inputContext, conf, numInputs, memoryUpdateCallbackHandler.getMemoryAssigned());
+ shuffle.run();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
+ }
+ List<Event> pending = new LinkedList<Event>();
+ pendingEvents.drainTo(pending);
+ if (pending.size() > 0) {
+ LOG.info("NoAutoStart delay in processing first event: "
+ + (System.currentTimeMillis() - firstEventReceivedTime));
+ shuffle.handleEvents(pending);
}
+ isStarted.set(true);
}
}
@@ -124,7 +132,8 @@ public class ShuffledMergedInput implements LogicalInput {
* processing fetching the input. false if the shuffle and merge are
* still in progress
*/
- public boolean isInputReady() {
+ public synchronized boolean isInputReady() {
+ Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
if (this.numInputs == 0) {
return true;
}
@@ -136,7 +145,8 @@ public class ShuffledMergedInput implements LogicalInput {
* @throws IOException
* @throws InterruptedException
*/
- public void waitForInputReady() throws IOException, InterruptedException {
+ public synchronized void waitForInputReady() throws IOException, InterruptedException {
+ Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
if (this.numInputs == 0) {
return;
}
@@ -145,7 +155,7 @@ public class ShuffledMergedInput implements LogicalInput {
}
@Override
- public List<Event> close() throws IOException {
+ public synchronized List<Event> close() throws IOException {
if (this.numInputs != 0 && rawIter != null) {
rawIter.close();
}
@@ -165,7 +175,7 @@ public class ShuffledMergedInput implements LogicalInput {
* @return a KVReader over the sorted input.
*/
@Override
- public KeyValuesReader getReader() throws IOException {
+ public synchronized KeyValuesReader getReader() throws IOException {
if (this.numInputs == 0) {
return new KeyValuesReader() {
@Override
@@ -211,26 +221,22 @@ public class ShuffledMergedInput implements LogicalInput {
}
@Override
- public void handleEvents(List<Event> inputEvents) {
+ public synchronized void handleEvents(List<Event> inputEvents) {
if (numInputs == 0) {
throw new RuntimeException("No input events expected as numInputs is 0");
}
if (!isStarted.get()) {
- synchronized (this) {
- if (!isStarted.get()) {
- if (firstEventReceivedTime == -1) {
- firstEventReceivedTime = System.currentTimeMillis();
- }
- pendingEvents.addAll(inputEvents);
- return;
- }
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
}
+ pendingEvents.addAll(inputEvents);
+ return;
}
shuffle.handleEvents(inputEvents);
}
@Override
- public void setNumPhysicalInputs(int numInputs) {
+ public synchronized void setNumPhysicalInputs(int numInputs) {
this.numInputs = numInputs;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index bf0ed11..a07159f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -38,10 +38,10 @@ import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.readers.ShuffledUnorderedKVReader;
import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleInputEventHandlerImpl;
@@ -49,7 +49,7 @@ import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
import com.google.common.base.Preconditions;
-public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallback {
+public class ShuffledUnorderedKVInput implements LogicalInput {
private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
@@ -58,7 +58,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
private TezInputContext inputContext;
private ShuffleManager shuffleManager;
private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
- private volatile long firstEventReceivedTime = -1;
+ private long firstEventReceivedTime = -1;
+ private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
@SuppressWarnings("rawtypes")
private ShuffledUnorderedKVReader kvReader;
@@ -67,14 +68,12 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
private SimpleFetchedInputAllocator inputManager;
private ShuffleEventHandler inputEventHandler;
-
- private volatile long initialMemoryAvailable = -1;
-
+
public ShuffledUnorderedKVInput() {
}
@Override
- public List<Event> initialize(TezInputContext inputContext) throws Exception {
+ public synchronized List<Event> initialize(TezInputContext inputContext) throws Exception {
Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
this.inputContext = inputContext;
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
@@ -88,7 +87,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
return Collections.emptyList();
} else {
long initalMemReq = getInitialMemoryReq();
- this.inputContext.requestInitialMemory(initalMemReq, this);
+ memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ this.inputContext.requestInitialMemory(initalMemReq, memoryUpdateCallbackHandler);
}
this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
@@ -98,67 +98,60 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
}
@Override
- public void start() throws IOException {
- synchronized (this) {
- if (!isStarted.get()) {
- ////// Initial configuration
- Preconditions.checkState(initialMemoryAvailable != -1,
- "Initial memory available must be configured before starting");
- CompressionCodec codec;
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass = ConfigUtils
- .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- codec = null;
- }
-
- boolean ifileReadAhead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
- int ifileReadAheadLength = 0;
- int ifileBufferSize = 0;
-
- if (ifileReadAhead) {
- ifileReadAheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
- }
- ifileBufferSize = conf.getInt("io.file.buffer.size",
- TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-
- this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs);
-
- this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
- inputContext.getTotalMemoryAvailableToTask());
- inputManager.setInitialMemoryAvailable(initialMemoryAvailable);
- inputManager.configureAndStart();
-
- this.inputEventHandler = new ShuffleInputEventHandlerImpl(
- inputContext, shuffleManager, inputManager, codec, ifileReadAhead,
- ifileReadAheadLength);
-
- this.shuffleManager.setCompressionCodec(codec);
- this.shuffleManager.setIfileParameters(ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
- this.shuffleManager.setFetchedInputAllocator(inputManager);
- this.shuffleManager.setInputEventHandler(inputEventHandler);
- ////// End of Initial configuration
-
- this.shuffleManager.run();
- this.kvReader = createReader(inputRecordCounter, codec,
- ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
- List<Event> pending = new LinkedList<Event>();
- pendingEvents.drainTo(pending);
- if (pending.size() > 0) {
- LOG.info("NoAutoStart delay in processing first event: "
- + (System.currentTimeMillis() - firstEventReceivedTime));
- shuffleManager.handleEvents(pending);
- }
- isStarted.set(true);
+ public synchronized void start() throws IOException {
+ if (!isStarted.get()) {
+ ////// Initial configuration
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ CompressionCodec codec;
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } else {
+ codec = null;
+ }
+
+ boolean ifileReadAhead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ int ifileReadAheadLength = 0;
+ int ifileBufferSize = 0;
+
+ if (ifileReadAhead) {
+ ifileReadAheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
}
+ ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+ this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
+ inputContext.getTotalMemoryAvailableToTask(),
+ memoryUpdateCallbackHandler.getMemoryAssigned());
+
+ this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs, ifileBufferSize,
+ ifileReadAhead, ifileReadAheadLength, codec, inputManager);
+
+ this.inputEventHandler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager,
+ inputManager, codec, ifileReadAhead, ifileReadAheadLength);
+
+ ////// End of Initial configuration
+
+ this.shuffleManager.run();
+ this.kvReader = createReader(inputRecordCounter, codec,
+ ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
+ List<Event> pending = new LinkedList<Event>();
+ pendingEvents.drainTo(pending);
+ if (pending.size() > 0) {
+ LOG.info("NoAutoStart delay in processing first event: "
+ + (System.currentTimeMillis() - firstEventReceivedTime));
+ inputEventHandler.handleEvents(pending);
+ }
+ isStarted.set(true);
}
}
@Override
- public KeyValueReader getReader() throws Exception {
+ public synchronized KeyValueReader getReader() throws Exception {
+ Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
if (numInputs == 0) {
return new KeyValueReader() {
@Override
@@ -181,29 +174,25 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
}
@Override
- public void handleEvents(List<Event> inputEvents) throws IOException {
+ public synchronized void handleEvents(List<Event> inputEvents) throws IOException {
if (numInputs == 0) {
throw new RuntimeException("No input events expected as numInputs is 0");
}
if (!isStarted.get()) {
- synchronized(this) {
- if (!isStarted.get()) {
- if (firstEventReceivedTime == -1) {
- firstEventReceivedTime = System.currentTimeMillis();
- }
- // This queue will keep growing if the Processor decides never to
- // start the event. The Input, however has no idea, on whether start
- // will be invoked or not.
- pendingEvents.addAll(inputEvents);
- return;
- }
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
}
+ // This queue will keep growing if the Processor decides never to
+ // start the event. The Input, however has no idea, on whether start
+ // will be invoked or not.
+ pendingEvents.addAll(inputEvents);
+ return;
}
- shuffleManager.handleEvents(inputEvents);
+ inputEventHandler.handleEvents(inputEvents);
}
@Override
- public List<Event> close() throws Exception {
+ public synchronized List<Event> close() throws Exception {
if (this.shuffleManager != null) {
this.shuffleManager.shutdown();
}
@@ -211,15 +200,10 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
}
@Override
- public void setNumPhysicalInputs(int numInputs) {
+ public synchronized void setNumPhysicalInputs(int numInputs) {
this.numInputs = numInputs;
}
- @Override
- public void memoryAssigned(long assignedSize) {
- this.initialMemoryAvailable = assignedSize;
- }
-
private long getInitialMemoryReq() {
return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
inputContext.getTotalMemoryAvailableToTask());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 3dd41e4..dfd238c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -37,6 +37,7 @@ import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -46,6 +47,7 @@ import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
@@ -61,13 +63,14 @@ public class OnFileSortedOutput implements LogicalOutput {
protected Configuration conf;
protected int numOutputs;
protected TezOutputContext outputContext;
+ protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
private long startTime;
private long endTime;
private boolean sendEmptyPartitionDetails;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@Override
- public List<Event> initialize(TezOutputContext outputContext)
+ public synchronized List<Event> initialize(TezOutputContext outputContext)
throws IOException {
this.startTime = System.nanoTime();
this.outputContext = outputContext;
@@ -76,30 +79,36 @@ public class OnFileSortedOutput implements LogicalOutput {
// places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
// TezMerger, etc.
this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
-
- if (this.conf.getInt(TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS) > 1) {
- sorter = new PipelinedSorter();
- } else {
- sorter = new DefaultSorter();
- }
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ outputContext.requestInitialMemory(
+ ExternalSorter.getInitialMemoryRequirement(conf,
+ outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
sendEmptyPartitionDetails = this.conf.getBoolean(
TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
- sorter.initialize(outputContext, conf, numOutputs);
return Collections.emptyList();
}
@Override
- public void start() throws Exception {
- if (!isStarted.getAndSet(true)) {
- sorter.start();
+ public synchronized void start() throws Exception {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ if (this.conf.getInt(TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS) > 1) {
+ sorter = new PipelinedSorter(outputContext, conf, numOutputs,
+ memoryUpdateCallbackHandler.getMemoryAssigned());
+ } else {
+ sorter = new DefaultSorter(outputContext, conf, numOutputs,
+ memoryUpdateCallbackHandler.getMemoryAssigned());
+ }
+ isStarted.set(true);
}
}
@Override
- public KeyValueWriter getWriter() throws IOException {
+ public synchronized KeyValueWriter getWriter() throws IOException {
+ Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output");
return new KeyValueWriter() {
@Override
public void write(Object key, Object value) throws IOException {
@@ -109,22 +118,27 @@ public class OnFileSortedOutput implements LogicalOutput {
}
@Override
- public void handleEvents(List<Event> outputEvents) {
+ public synchronized void handleEvents(List<Event> outputEvents) {
// Not expecting any events.
}
@Override
- public void setNumPhysicalOutputs(int numOutputs) {
+ public synchronized void setNumPhysicalOutputs(int numOutputs) {
this.numOutputs = numOutputs;
}
@Override
- public List<Event> close() throws IOException {
- sorter.flush();
- sorter.close();
- this.endTime = System.nanoTime();
-
- return generateEventsOnClose();
+ public synchronized List<Event> close() throws IOException {
+ if (sorter != null) {
+ sorter.flush();
+ sorter.close();
+ this.endTime = System.nanoTime();
+ return generateEventsOnClose();
+ } else {
+ LOG.warn("Attempting to close output " + outputContext.getDestinationVertexName()
+ + " before it was started");
+ return Collections.emptyList();
+ }
}
protected List<Event> generateEventsOnClose() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 264e628..bde9bfe 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezConfiguration;
@@ -60,7 +61,6 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.Fetcher;
import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.InputHost;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
@@ -85,49 +85,45 @@ public class ShuffleManager implements FetcherCallback {
private final Configuration conf;
private final int numInputs;
- private ShuffleEventHandler inputEventHandler;
- private FetchedInputAllocator inputManager;
-
- private ExecutorService fetcherRawExecutor;
- private ListeningExecutorService fetcherExecutor;
+ private final FetchedInputAllocator inputManager;
+
+ private final ListeningExecutorService fetcherExecutor;
- private ExecutorService schedulerRawExecutor;
- private ListeningExecutorService schedulerExecutor;
- private RunShuffleCallable schedulerCallable = new RunShuffleCallable();
+ private final ExecutorService schedulerRawExecutor;
+ private final ListeningExecutorService schedulerExecutor;
+ private final RunShuffleCallable schedulerCallable = new RunShuffleCallable();
- private BlockingQueue<FetchedInput> completedInputs;
- private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
- private Set<InputIdentifier> completedInputSet;
- private ConcurrentMap<String, InputHost> knownSrcHosts;
- private BlockingQueue<InputHost> pendingHosts;
- private Set<InputAttemptIdentifier> obsoletedInputs;
+ private final BlockingQueue<FetchedInput> completedInputs;
+ private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
+ private final Set<InputIdentifier> completedInputSet;
+ private final ConcurrentMap<String, InputHost> knownSrcHosts;
+ private final BlockingQueue<InputHost> pendingHosts;
+ private final Set<InputAttemptIdentifier> obsoletedInputs;
- private AtomicInteger numCompletedInputs = new AtomicInteger(0);
+ private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
- private long startTime;
+ private final long startTime;
private long lastProgressTime;
// Required to be held when manipulating pendingHosts
- private ReentrantLock lock = new ReentrantLock();
- private Condition wakeLoop = lock.newCondition();
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition wakeLoop = lock.newCondition();
- private int numFetchers;
- private AtomicInteger numRunningFetchers = new AtomicInteger(0);
+ private final int numFetchers;
+ private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
// Parameters required by Fetchers
- private SecretKey shuffleSecret;
- private int connectionTimeout;
- private int readTimeout;
- private CompressionCodec codec;
+ private final SecretKey shuffleSecret;
+ private final int connectionTimeout;
+ private final int readTimeout;
+ private final CompressionCodec codec;
- private int ifileBufferSize;
- private boolean ifileReadAhead;
- private int ifileReadAheadLength;
+ private final int ifileBufferSize;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-
- private volatile Throwable shuffleError;
-
+
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final TezCounter shuffledInputsCounter;
@@ -137,9 +133,13 @@ public class ShuffleManager implements FetcherCallback {
private final TezCounter bytesShuffledToDiskCounter;
private final TezCounter bytesShuffledToMemCounter;
+ private volatile Throwable shuffleError;
+
// TODO More counters - FetchErrors, speed?
- public ShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+ public ShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs,
+ int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
+ CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
this.numInputs = numInputs;
@@ -150,28 +150,13 @@ public class ShuffleManager implements FetcherCallback {
this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
- }
- public void setIfileParameters(int bufferSize, boolean readAhead, int readAheadLength) {
this.ifileBufferSize = bufferSize;
- this.ifileReadAhead = readAhead;
- this.ifileReadAheadLength = readAheadLength;
- }
-
- public void setCompressionCodec(CompressionCodec codec) {
+ this.ifileReadAhead = ifileReadAheadEnabled;
+ this.ifileReadAheadLength = ifileReadAheadLength;
this.codec = codec;
- }
-
- public void setInputEventHandler(ShuffleEventHandler eventHandler) {
- this.inputEventHandler = eventHandler;
- }
+ this.inputManager = inputAllocator;
- public void setFetchedInputAllocator(FetchedInputAllocator allocator) {
- this.inputManager = allocator;
- }
-
-
- private void configureAndStart() throws IOException {
completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
@@ -185,12 +170,12 @@ public class ShuffleManager implements FetcherCallback {
this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
- this.fetcherRawExecutor = Executors.newFixedThreadPool(
+ ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
numFetchers,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(
- "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
+ "Fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #%d")
.build());
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
@@ -199,7 +184,7 @@ public class ShuffleManager implements FetcherCallback {
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(
- "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
+ "ShuffleRunner [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]")
.build());
this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
@@ -225,9 +210,7 @@ public class ShuffleManager implements FetcherCallback {
public void run() throws IOException {
Preconditions.checkState(inputManager != null, "InputManager must be configured");
- Preconditions.checkState(inputEventHandler != null, "InputEventHandler must be configured");
-
- configureAndStart();
+
ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
// Shutdown this executor once this task, and the callback complete.
@@ -444,11 +427,6 @@ public class ShuffleManager implements FetcherCallback {
obsoletedInputs.add(srcAttemptIdentifier);
// TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
}
-
-
- public void handleEvents(List<Event> events) throws IOException {
- inputEventHandler.handleEvents(events);
- }
/////////////////// End of Methods for InputEventHandler
/////////////////// Methods from FetcherCallbackHandler