You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/13 05:09:18 UTC
git commit: TEZ-815. Split initialize and start implementations for
the various Inputs and Outputs. (sseth)
Updated Branches:
refs/heads/master 924ce6611 -> 3a63d9b1a
TEZ-815. Split initialize and start implementations for the various
Inputs and Outputs. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3a63d9b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3a63d9b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3a63d9b1
Branch: refs/heads/master
Commit: 3a63d9b1a0f83cf366c7273760baa600755bc8c1
Parents: 924ce66
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 12 20:08:33 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 12 20:08:33 2014 -0800
----------------------------------------------------------------------
.../tez/runtime/api/MergedLogicalInput.java | 9 +-
.../apache/tez/runtime/api/TezTaskContext.java | 13 +++
.../org/apache/tez/mapreduce/input/MRInput.java | 7 +-
.../apache/tez/mapreduce/output/MROutput.java | 4 +-
.../runtime/api/impl/TezTaskContextImpl.java | 26 +++--
.../broadcast/input/BroadcastInputManager.java | 65 ++++++++---
.../input/BroadcastShuffleManager.java | 86 +++++++++-----
.../common/shuffle/impl/MergeManager.java | 114 +++++++++++++------
.../library/common/shuffle/impl/Shuffle.java | 50 +++++---
.../shuffle/impl/ShuffleInputEventHandler.java | 2 -
.../common/shuffle/impl/ShuffleScheduler.java | 2 +-
.../common/sort/impl/ExternalSorter.java | 31 +++--
.../common/sort/impl/PipelinedSorter.java | 8 +-
.../common/sort/impl/dflt/DefaultSorter.java | 8 +-
.../runtime/library/input/LocalMergedInput.java | 7 +-
.../library/input/ShuffledMergedInput.java | 22 +---
.../library/input/ShuffledUnorderedKVInput.java | 13 +--
.../library/output/LocalOnFileSorterOutput.java | 2 +-
.../library/output/OnFileSortedOutput.java | 6 +-
.../library/output/OnFileUnorderedKVOutput.java | 6 +-
.../input/TestBroadcastInputManager.java | 12 +-
.../java/org/apache/tez/test/TestInput.java | 8 +-
.../java/org/apache/tez/test/TestOutput.java | 8 +-
23 files changed, 333 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index d91a863..405e0d8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime.api;
import java.util.List;
+import com.google.common.collect.Lists;
/**
* A LogicalInput that is used to merge the data from multiple inputs and provide a
@@ -45,10 +46,14 @@ public abstract class MergedLogicalInput implements LogicalInput {
@Override
public List<Event> start() throws Exception {
+ List<Event> events = Lists.newLinkedList();
for (Input input : inputs) {
- input.start();
+ List<Event> inputEvents = input.start();
+ if (inputEvents != null) {
+ events.addAll(inputEvents);
+ }
}
- return null;
+ return events;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
index 70ef41d..6a7154c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
@@ -147,10 +147,23 @@ public interface TezTaskContext {
* This method can be called only once by any component. Calling it multiple
* times from within the same component will result in an error.
*
+ * Each Input / Output must request memory. For Inputs / Outputs which do not
+ * have a specific ask, a null callback handler can be specified with a
+ * request size of 0.
+ *
* @param size
* request size in bytes.
* @param callbackHandler
* the callback handler to be invoked once memory is assigned
*/
public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler);
+
+ /**
+ * Gets the total memory available to all components of the running task. This
+ * values will always be constant, and does not factor in any allocations.
+ *
+ * @return the total available memory for all components of the task
+ */
+ public long getTotalMemoryAvailableToTask();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index f8f99b7..7108d37 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,6 +18,7 @@
package org.apache.tez.mapreduce.input;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -120,8 +121,7 @@ public class MRInput implements LogicalInput {
@Override
public List<Event> initialize(TezInputContext inputContext) throws IOException {
this.inputContext = inputContext;
- // TEZ 815. Fix this. Not used until there's a separation between init and start.
- inputContext.requestInitialMemory(0l, null);
+ this.inputContext.requestInitialMemory(0l, null); //mandatory call
MRInputUserPayloadProto mrUserPayload =
MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
@@ -160,8 +160,7 @@ public class MRInput implements LogicalInput {
@Override
public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
- return null;
+ return Collections.emptyList();
}
@Private
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 66d4566..2d7b48a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -90,8 +90,7 @@ public class MROutput implements LogicalOutput {
public List<Event> initialize(TezOutputContext outputContext)
throws IOException, InterruptedException {
LOG.info("Initializing Simple Output");
- // TEZ 815. Fix this. Not used until there's a separation between init and start.
- outputContext.requestInitialMemory(0l, null);
+ outputContext.requestInitialMemory(0l, null); //mandatory call
taskNumberFormat.setMinimumIntegerDigits(5);
taskNumberFormat.setGroupingUsed(false);
nonTaskNumberFormat.setMinimumIntegerDigits(3);
@@ -193,7 +192,6 @@ public class MROutput implements LogicalOutput {
@Override
public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index c31e903..e15f6f2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -37,6 +37,8 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezTaskContext;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
+import com.google.common.base.Preconditions;
+
public abstract class TezTaskContextImpl implements TezTaskContext {
private static final AtomicInteger ID_GEN = new AtomicInteger(10000);
@@ -148,15 +150,25 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
@Override
public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler) {
- this.initialMemoryDistributor.requestMemory(size, new MemoryUpdateCallback() {
- @Override
- public void memoryAssigned(long assignedSize) {
- // Filler for this patch.
- // TODO TEZ-815 implement this properly.
- }
- }, this, descriptor);
+ // Nulls allowed since all IOs have to make this call.
+ if (callbackHandler == null) {
+ Preconditions.checkArgument(size == 0,
+ "A Null callback handler can only be used with a request size of 0");
+ callbackHandler = new MemoryUpdateCallback() {
+ @Override
+ public void memoryAssigned(long assignedSize) {
+
+ }
+ };
+ }
+ this.initialMemoryDistributor.requestMemory(size, callbackHandler, this, this.descriptor);
}
+ @Override
+ public long getTotalMemoryAvailableToTask() {
+ return Runtime.getRuntime().maxMemory();
+ }
+
protected void signalFatalError(Throwable t, String message,
EventMetaData sourceInfo) {
runtimeTask.setFatalError(t, message);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index 6c5a3da..4af4404 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -22,11 +22,11 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
@@ -36,25 +36,43 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+import com.google.common.base.Preconditions;
+
+/**
+ * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@Private
public class BroadcastInputManager implements FetchedInputAllocator,
FetchedInputCallback {
private static final Log LOG = LogFactory.getLog(BroadcastInputManager.class);
private final Configuration conf;
+ private final String uniqueIdentifier;
- private final TezTaskOutputFiles fileNameAllocator;
- private final LocalDirAllocator localDirAllocator;
+ private TezTaskOutputFiles fileNameAllocator;
+ private LocalDirAllocator localDirAllocator;
// Configuration parameters
- private final long memoryLimit;
- private final long maxSingleShuffleLimit;
+ private long memoryLimit;
+ private long maxSingleShuffleLimit;
private volatile long usedMemory = 0;
+
+ private long maxAvailableTaskMemory;
+ private long initialMemoryAvailable =-1l;
- public BroadcastInputManager(String uniqueIdentifier, Configuration conf, TezInputContext inputContext) {
- this.conf = conf;
+ public BroadcastInputManager(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory) {
+ this.conf = conf;
+ this.uniqueIdentifier = uniqueIdentifier;
+ this.maxAvailableTaskMemory = maxTaskAvailableMemory;
+ }
+ @Private
+ void configureAndStart() {
+ Preconditions.checkState(initialMemoryAvailable != -1,
+ "Initial memory must be configured before starting");
this.fileNameAllocator = new TezTaskOutputFiles(conf,
uniqueIdentifier);
this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
@@ -71,13 +89,14 @@ public class BroadcastInputManager implements FetchedInputAllocator,
// Allow unit tests to fix Runtime memory
long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
- Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+ Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
- // TEZ 815. Fix this. Not used until there's a separation between init and start.
- inputContext.requestInitialMemory(memReq, null);
- long memAlloc = memReq;
-
- this.memoryLimit = memAlloc;
+ if (memReq <= this.initialMemoryAvailable) {
+ this.memoryLimit = memReq;
+ } else {
+ this.memoryLimit = initialMemoryAvailable;
+ }
+
LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
final float singleShuffleMemoryLimitPercent = conf.getFloat(
@@ -95,6 +114,26 @@ public class BroadcastInputManager implements FetchedInputAllocator,
LOG.info("BroadcastInputManager -> " + "MemoryLimit: " +
this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
}
+
+ @Private
+ static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
+ final float maxInMemCopyUse = conf.getFloat(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new IllegalArgumentException("Invalid value for "
+ + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ + maxInMemCopyUse);
+ }
+ long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+ Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+ return memReq;
+ }
+
+ @Private
+ void setInitialMemoryAvailable(long available) {
+ this.initialMemoryAvailable = available;
+ }
@Override
public synchronized FetchedInput allocate(long actualSize, long compressedSize,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index e4c31cf..bc17496 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -49,6 +49,7 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -64,6 +65,7 @@ import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.InputHost;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -72,51 +74,51 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-public class BroadcastShuffleManager implements FetcherCallback {
+public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCallback {
private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
- private TezInputContext inputContext;
- private int numInputs;
- private Configuration conf;
+ private final TezInputContext inputContext;
+ private final Configuration conf;
+ private final int numInputs;
- private final BroadcastShuffleInputEventHandler inputEventHandler;
- private final FetchedInputAllocator inputManager;
+ private BroadcastShuffleInputEventHandler inputEventHandler;
+ private FetchedInputAllocator inputManager;
- private final ExecutorService fetcherRawExecutor;
- private final ListeningExecutorService fetcherExecutor;
+ private ExecutorService fetcherRawExecutor;
+ private ListeningExecutorService fetcherExecutor;
- private final ExecutorService schedulerRawExecutor;
- private final ListeningExecutorService schedulerExecutor;
- private final RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
+ private ExecutorService schedulerRawExecutor;
+ private ListeningExecutorService schedulerExecutor;
+ private RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
- private final BlockingQueue<FetchedInput> completedInputs;
- private final Set<InputIdentifier> completedInputSet;
- private final ConcurrentMap<String, InputHost> knownSrcHosts;
- private final BlockingQueue<InputHost> pendingHosts;
- private final Set<InputAttemptIdentifier> obsoletedInputs;
+ private BlockingQueue<FetchedInput> completedInputs;
+ private Set<InputIdentifier> completedInputSet;
+ private ConcurrentMap<String, InputHost> knownSrcHosts;
+ private BlockingQueue<InputHost> pendingHosts;
+ private Set<InputAttemptIdentifier> obsoletedInputs;
- private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
+ private AtomicInteger numCompletedInputs = new AtomicInteger(0);
- private final long startTime;
+ private long startTime;
private long lastProgressTime;
// Required to be held when manipulating pendingHosts
private ReentrantLock lock = new ReentrantLock();
private Condition wakeLoop = lock.newCondition();
- private final int numFetchers;
- private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
+ private int numFetchers;
+ private AtomicInteger numRunningFetchers = new AtomicInteger(0);
// Parameters required by Fetchers
- private final SecretKey shuffleSecret;
- private final int connectionTimeout;
- private final int readTimeout;
- private final CompressionCodec codec;
+ private SecretKey shuffleSecret;
+ private int connectionTimeout;
+ private int readTimeout;
+ private CompressionCodec codec;
- private final boolean ifileReadAhead;
- private final int ifileReadAheadLength;
- private final int ifileBufferSize;
+ private boolean ifileReadAhead;
+ private int ifileReadAheadLength;
+ private int ifileBufferSize;
private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
@@ -124,13 +126,21 @@ public class BroadcastShuffleManager implements FetcherCallback {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private volatile long initialMemoryAvailable = -1l;
+
// TODO NEWTEZ Add counters.
public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
this.numInputs = numInputs;
+ long initalMemReq = getInitialMemoryReq();
+ this.inputContext.requestInitialMemory(initalMemReq, this);
+ }
+ private void configureAndStart() throws IOException {
+ Preconditions.checkState(initialMemoryAvailable != -1,
+ "Initial memory available must be configured before starting");
if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass = ConfigUtils
.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -152,7 +162,10 @@ public class BroadcastShuffleManager implements FetcherCallback {
this.ifileBufferSize = conf.getInt("io.file.buffer.size",
TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
- this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf, inputContext);
+ this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf,
+ inputContext.getTotalMemoryAvailableToTask());
+ ((BroadcastInputManager)this.inputManager).setInitialMemoryAvailable(initialMemoryAvailable);
+ ((BroadcastInputManager)this.inputManager).configureAndStart();
this.inputEventHandler = new BroadcastShuffleInputEventHandler(
inputContext, this, this.inputManager, codec, ifileReadAhead,
ifileReadAheadLength);
@@ -208,7 +221,17 @@ public class BroadcastShuffleManager implements FetcherCallback {
.getName()) + ", numFetchers: " + numFetchers);
}
- public void run() {
+ private long getInitialMemoryReq() {
+ return BroadcastInputManager.getInitialMemoryReq(conf,
+ inputContext.getTotalMemoryAvailableToTask());
+ }
+
+ public void setInitialMemoryAvailable(long available) {
+ this.initialMemoryAvailable = available;
+ }
+
+ public void run() throws IOException {
+ configureAndStart();
ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
// Shutdown this executor once this task, and the callback complete.
@@ -673,4 +696,9 @@ public class BroadcastShuffleManager implements FetcherCallback {
}
}
}
+
+ @Override
+ public void memoryAssigned(long assignedSize) {
+ this.initialMemoryAvailable = assignedSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 37808c3..051806c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -59,6 +59,12 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
+import com.google.common.base.Preconditions;
+
+/**
+ * Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings(value={"rawtypes"})
@@ -77,24 +83,26 @@ public class MergeManager {
Set<MapOutput> inMemoryMergedMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
- private final IntermediateMemoryToMemoryMerger memToMemMerger;
+ private IntermediateMemoryToMemoryMerger memToMemMerger;
Set<MapOutput> inMemoryMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
- private final InMemoryMerger inMemoryMerger;
+ private InMemoryMerger inMemoryMerger;
Set<Path> onDiskMapOutputs = new TreeSet<Path>();
- private final OnDiskMerger onDiskMerger;
+ private OnDiskMerger onDiskMerger;
- private final long memoryLimit;
+ private long memoryLimit;
+ private int postMergeMemLimit;
private long usedMemory;
private long commitMemory;
- private final long maxSingleShuffleLimit;
+ private int ioSortFactor;
+ private long maxSingleShuffleLimit;
- private final int memToMemMergeOutputsThreshold;
- private final long mergeThreshold;
+ private int memToMemMergeOutputsThreshold;
+ private long mergeThreshold;
- private final int ioSortFactor;
+ private long initialMemoryAvailable = -1;
private final ExceptionReporter exceptionReporter;
@@ -106,16 +114,18 @@ public class MergeManager {
private final TezCounter mergedMapOutputsCounter;
- private final CompressionCodec codec;
+ private CompressionCodec codec;
private volatile boolean finalMergeComplete = false;
- private final boolean ifileReadAhead;
- private final int ifileReadAheadLength;
- private final int ifileBufferSize;
+ private boolean ifileReadAhead;
+ private int ifileReadAheadLength;
+ private int ifileBufferSize;
- private final int postMergeMemLimit;
-
+
+ /**
+ * Construct the MergeManager. Must call start before it becomes usable.
+ */
public MergeManager(Configuration conf,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
@@ -140,6 +150,16 @@ public class MergeManager {
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
+ }
+
+ void setInitialMemoryAvailable(long available) {
+ this.initialMemoryAvailable = available;
+ }
+
+ @Private
+ void configureAndStart() {
+ Preconditions.checkState(initialMemoryAvailable != -1,
+ "Initial available memory must be configured before starting");
if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -160,6 +180,7 @@ public class MergeManager {
this.ifileBufferSize = conf.getInt("io.file.buffer.size",
TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+ // Figure out initial memory req start
final float maxInMemCopyUse =
conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
@@ -172,9 +193,7 @@ public class MergeManager {
// Allow unit tests to fix Runtime memory
long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
- Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
-
- LOG.info("Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
+ Math.min(inputContext.getTotalMemoryAvailableToTask(), Integer.MAX_VALUE)) * maxInMemCopyUse);
float maxRedPer = conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
@@ -182,29 +201,25 @@ public class MergeManager {
throw new TezUncheckedException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT + maxRedPer);
}
// TODO maxRedBuffer should be a long.
- int maxRedBuffer = (int) Math.min(Runtime.getRuntime().maxMemory() * maxRedPer,
+ int maxRedBuffer = (int) Math.min(inputContext.getTotalMemoryAvailableToTask() * maxRedPer,
Integer.MAX_VALUE);
- LOG.info("Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
-
- long reqMem = Math.max(maxRedBuffer, memLimit);
- // TEZ 815. Fix this. Not used until there's a separation between init and start.
- inputContext.requestInitialMemory(reqMem, null);
- long availableMem = reqMem;
+ // Figure out initial memory req end
- if (availableMem < memLimit) {
- this.memoryLimit = availableMem;
+ if (this.initialMemoryAvailable < memLimit) {
+ this.memoryLimit = this.initialMemoryAvailable;
} else {
this.memoryLimit = memLimit;
}
-
- if (availableMem < maxRedBuffer) {
- this.postMergeMemLimit = (int) availableMem;
+
+ if (this.initialMemoryAvailable < maxRedBuffer) {
+ this.postMergeMemLimit = (int) this.initialMemoryAvailable;
} else {
this.postMergeMemLimit = maxRedBuffer;
}
-
- LOG.info("FinalMemoryAllocation: ShuffleMemory=" + this.memoryLimit + ", postMergeMem: "
- + this.postMergeMemLimit);
+
+ LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
+ + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
+ + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
this.ioSortFactor =
conf.getInt(
@@ -265,6 +280,41 @@ public class MergeManager {
this.onDiskMerger = new OnDiskMerger(this);
this.onDiskMerger.start();
}
+
+ /**
+ * Exposing this to get an initial memory ask without instantiating the object.
+ */
+ @Private
+ static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+ final float maxInMemCopyUse =
+ conf.getFloat(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new IllegalArgumentException("Invalid value for " +
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
+ maxInMemCopyUse);
+ }
+
+ // Allow unit tests to fix Runtime memory
+ long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+ Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+ LOG.info("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
+
+ float maxRedPer = conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
+ if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+ throw new TezUncheckedException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT + maxRedPer);
+ }
+ // TODO maxRedBuffer should be a long.
+ int maxRedBuffer = (int) Math.min(maxAvailableTaskMemory * maxRedPer,
+ Integer.MAX_VALUE);
+ LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
+
+ long reqMem = Math.max(maxRedBuffer, memLimit);
+ return reqMem;
+ }
public void waitForInMemoryMerge() throws InterruptedException {
inMemoryMerger.waitForMerge();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 80f6627..1959533 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -42,6 +42,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -51,38 +52,53 @@ import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import com.google.common.base.Preconditions;
+/**
+ * Usage: Create instance, setInitialMemoryAllocated(long), run()
+ *
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class Shuffle implements ExceptionReporter {
+public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
private static final Log LOG = LogFactory.getLog(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000;
private final Configuration conf;
private final TezInputContext inputContext;
- private final ShuffleClientMetrics metrics;
+ private final int numInputs;
+
+ private ShuffleClientMetrics metrics;
- private final ShuffleInputEventHandler eventHandler;
- private final ShuffleScheduler scheduler;
- private final MergeManager merger;
+ private ShuffleInputEventHandler eventHandler;
+ private ShuffleScheduler scheduler;
+ private MergeManager merger;
private Throwable throwable = null;
private String throwingThreadName = null;
- private final int numInputs;
- private final SecretKey jobTokenSecret;
- private final CompressionCodec codec;
- private final boolean ifileReadAhead;
- private final int ifileReadAheadLength;
+
+ private SecretKey jobTokenSecret;
+ private CompressionCodec codec;
+ private boolean ifileReadAhead;
+ private int ifileReadAheadLength;
+
+ private volatile long initialMemoryAvailable = -1;
private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
+ this.numInputs = numInputs;
+ long initialMemRequested = MergeManager.getInitialMemoryRequirement(conf,
+ inputContext.getTotalMemoryAvailableToTask());
+ inputContext.requestInitialMemory(initialMemRequested, this);
+ }
+
+ private void configureAndStart() throws IOException {
+ Preconditions.checkState(initialMemoryAvailable != -1,
+ "Initial Available memory must be configured before starting");
this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
-
- this.numInputs = numInputs;
this.jobTokenSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
@@ -151,6 +167,8 @@ public class Shuffle implements ExceptionReporter {
reduceCombineInputCounter,
mergedMapOutputsCounter,
this);
+ merger.setInitialMemoryAvailable(initialMemoryAvailable);
+ merger.configureAndStart();
}
public void handleEvents(List<Event> events) {
@@ -195,7 +213,8 @@ public class Shuffle implements ExceptionReporter {
return kvIter;
}
- public void run() {
+ public void run() throws IOException {
+ configureAndStart();
RunShuffleCallable runShuffle = new RunShuffleCallable();
runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
@@ -276,4 +295,9 @@ public class Shuffle implements ExceptionReporter {
}
}
+ @Override
+ public void memoryAssigned(long assignedSize) {
+ this.initialMemoryAvailable = assignedSize;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 3319752..2dcabe1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -64,8 +64,6 @@ public class ShuffleInputEventHandler {
}
private void processDataMovementEvent(DataMovementEvent dmEvent) {
- // FIXME TODO NEWTEZ
- // Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
DataMovementEventPayloadProto shufflePayload;
try {
shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index db4c794..3ff7d6b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -113,9 +113,9 @@ class ShuffleScheduler {
this.failedShuffleCounter = failedShuffleCounter;
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
- referee.start();
this.maxFailedUniqueFetches = Math.min(numberOfInputs,
this.maxFailedUniqueFetches);
+ referee.start();
this.maxFetchFailuresBeforeReporting =
conf.getInt(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index e8a78fa..78e55c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -53,7 +54,7 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
@SuppressWarnings({"unchecked", "rawtypes"})
-public abstract class ExternalSorter {
+public abstract class ExternalSorter implements MemoryUpdateCallback {
private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
@@ -81,8 +82,8 @@ public abstract class ExternalSorter {
protected boolean ifileReadAhead;
protected int ifileReadAheadLength;
protected int ifileBufferSize;
-
- protected int availableMemory;
+
+ protected volatile int availableMemoryMb;
protected IndexedSorter sorter;
@@ -96,6 +97,7 @@ public abstract class ExternalSorter {
protected TezCounter fileOutputByteCounter;
protected TezCounter spilledRecordsCounter;
+ @Private
public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
this.outputContext = outputContext;
this.conf = conf;
@@ -108,14 +110,9 @@ public abstract class ExternalSorter {
TezJobConfig.TEZ_RUNTIME_IO_SORT_MB,
TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
long reqBytes = reqMemory << 20;
- // TEZ 815. Fix this. Not used until there's a separation between init and start.
- outputContext.requestInitialMemory(reqBytes, null);
- long availBytes = reqBytes;
-
- this.availableMemory = (int) (availBytes >> 20);
-
- LOG.info("io.sort.mb requested: " + reqMemory + ", Allocated: " + availableMemory);
-
+ outputContext.requestInitialMemory(reqBytes, this);
+ LOG.info("Requested SortBufferSize (io.sort.mb): " + reqMemory);
+
// sorter
sorter = ReflectionUtils.newInstance(this.conf.getClass(
TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, QuickSort.class,
@@ -170,6 +167,13 @@ public abstract class ExternalSorter {
this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
}
+
+ /**
+ * Used to start the actual Output. Typically, this involves allocating
+ * buffers, starting required threads, etc
+ */
+ @Private
+ public abstract void start() throws Exception;
/**
* Exception indicating that the allocated sort buffer is insufficient to hold
@@ -224,4 +228,9 @@ public abstract class ExternalSorter {
public ShuffleHeader getShuffleHeader(int reduce) {
throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
}
+
+ @Override
+ public void memoryAssigned(long assignedSize) {
+ this.availableMemoryMb = (int) (assignedSize >> 20);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 3237522..6bfa098 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -34,7 +34,6 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -44,7 +43,6 @@ import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
@@ -92,8 +90,8 @@ public class PipelinedSorter extends ExternalSorter {
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
- public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
- super.initialize(outputContext, conf, numOutputs);
+ @Override
+ public void start() throws IOException {
partitionBits = bitcount(partitions)+1;
@@ -102,7 +100,7 @@ public class PipelinedSorter extends ExternalSorter {
this.conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
- final int sortmb = this.availableMemory;
+ final int sortmb = this.availableMemoryMb;
indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezJobConfig.DEFAULT_TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index e831dec..9079473 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -40,7 +39,6 @@ import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -111,14 +109,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
private int indexCacheMemoryLimit;
@Override
- public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
- super.initialize(outputContext, conf, numOutputs);
-
+ public void start() throws IOException {
// sanity checks
final float spillper = this.conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
- final int sortmb = this.availableMemory;
+ final int sortmb = this.availableMemoryMb;
if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
throw new IOException("Invalid \""
+ TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT + "\": " + spillper);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index b8191db..4d2441c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -36,7 +36,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
@Override
public List<Event> initialize(TezInputContext inputContext) throws IOException {
this.inputContext = inputContext;
- this.inputContext.requestInitialMemory(0l, null); // mandatory call. Fix in TEZ-815
+ this.inputContext.requestInitialMemory(0l, null); // mandatory call.
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
if (numInputs == 0) {
@@ -48,6 +48,11 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
createValuesIterator();
return Collections.emptyList();
}
+
+ @Override
+ public List<Event> start() throws IOException {
+ return Collections.emptyList();
+ }
@Override
public List<Event> close() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 9b9cfaf..bee7e45 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -38,6 +38,7 @@ import org.apache.tez.runtime.library.common.ValuesIterator;
import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
/**
* <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
* intermediate sorted data, merges them and provides key/<values> to the
@@ -78,17 +79,15 @@ public class ShuffledMergedInput implements LogicalInput {
this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
inputContext.getWorkDirs());
- // Start the shuffle - copy and merge.
shuffle = new Shuffle(inputContext, this.conf, numInputs);
- shuffle.run();
-
return Collections.emptyList();
}
@Override
- public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
- return null;
+ public List<Event> start() throws IOException {
+ // Start the shuffle - copy and merge
+ shuffle.run();
+ return Collections.emptyList();
}
/**
@@ -207,15 +206,4 @@ public class ShuffledMergedInput implements LogicalInput {
}
- // This functionality is currently broken. If there's inputs which need to be
- // written to disk, there's a possibility that inputs from the different
- // sources could clobber each others' output. Also the current structures do
- // not have adequate information to de-dupe these (vertex name)
-// public void mergeWith(ShuffledMergedInput other) {
-// this.numInputs += other.getNumPhysicalInputs();
-// }
-//
-// public int getNumPhysicalInputs() {
-// return this.numInputs;
-// }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 8535c58..61dff6f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime.library.input;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -34,7 +35,6 @@ import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
import com.google.common.base.Preconditions;
-
public class ShuffledUnorderedKVInput implements LogicalInput {
private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
@@ -60,15 +60,14 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
this.shuffleManager = new BroadcastShuffleManager(inputContext, conf,
numInputs);
- this.shuffleManager.run();
- this.kvReader = this.shuffleManager.createReader();
- return null;
+ return Collections.emptyList();
}
@Override
- public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
- return null;
+ public List<Event> start() throws IOException {
+ this.shuffleManager.run();
+ this.kvReader = this.shuffleManager.createReader();
+ return Collections.emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
index c8a8233..d7e017a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -32,7 +32,7 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput {
private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
-
+
@Override
public List<Event> close() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index c9a12b2..38f6bf9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -80,9 +80,9 @@ public class OnFileSortedOutput implements LogicalOutput {
}
@Override
- public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
- return null;
+ public List<Event> start() throws Exception {
+ sorter.start();
+ return Collections.emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index eb80940..04d638f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -69,8 +69,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
outputContext.getWorkDirs());
- // TEZ 815. Fix this. Not used until there's a separation between init and start.
- this.outputContext.requestInitialMemory(0l, null);
+ this.outputContext.requestInitialMemory(0l, null); // mandatory call
this.dataViaEventsEnabled = conf.getBoolean(
TezJobConfig.TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED,
@@ -89,8 +88,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
@Override
public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
- return null;
+ return Collections.emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
index b502b1d..f07c5ac 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
@@ -19,7 +19,6 @@
package org.apache.tez.runtime.library.broadcast.input;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.UUID;
@@ -28,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.junit.Test;
@@ -52,12 +50,12 @@ public class TestBroadcastInputManager {
long inMemThreshold = (long) (bufferPercent * jvmMax);
LOG.info("InMemThreshold: " + inMemThreshold);
-
- TezInputContext mockInputContext = mock(TezInputContext.class);
+
BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(),
- conf, mockInputContext);
-
-
+ conf, Runtime.getRuntime().maxMemory());
+ inputManager.setInitialMemoryAvailable(inMemThreshold);
+ inputManager.configureAndStart();
+
long requestSize = (long) (0.4f * inMemThreshold);
long compressedSize = 1l;
LOG.info("RequestSize: " + requestSize);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 3063ec2..5ed9faa 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -18,6 +18,7 @@
package org.apache.tez.test;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -188,7 +189,7 @@ public class TestInput implements LogicalInput {
@Override
public List<Event> initialize(TezInputContext inputContext) throws Exception {
this.inputContext = inputContext;
- this.inputContext.requestInitialMemory(0l, null); //Mandatory call. Fix null in TEZ-815.
+ this.inputContext.requestInitialMemory(0l, null); //Mandatory call.
if (inputContext.getUserPayload() != null) {
String vName = inputContext.getTaskVertexName();
conf = MRHelpers.createConfFromUserPayload(inputContext.getUserPayload());
@@ -220,13 +221,12 @@ public class TestInput implements LogicalInput {
}
}
}
- return null;
+ return Collections.emptyList();
}
@Override
public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
- return null;
+ return Collections.emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 5f2edae..ddd2a29 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -18,6 +18,7 @@
package org.apache.tez.test;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -46,14 +47,13 @@ public class TestOutput implements LogicalOutput {
public List<Event> initialize(TezOutputContext outputContext)
throws Exception {
this.outputContext = outputContext;
- this.outputContext.requestInitialMemory(0l, null); //Mandatory call. Fix null in TEZ-815.
- return null;
+ this.outputContext.requestInitialMemory(0l, null); //Mandatory call
+ return Collections.emptyList();
}
@Override
public List<Event> start() {
- // TODO TEZ-815 To be fixed in a subsequent jira if required.
- return null;
+ return Collections.emptyList();
}
@Override