You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/06 23:29:16 UTC
[2/2] git commit: TEZ-911. Re-factor BroadcastShuffle related code to
be independent of Braodcast. (sseth)
TEZ-911. Re-factor BroadcastShuffle related code to be independent of
Braodcast. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0df10815
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0df10815
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0df10815
Branch: refs/heads/master
Commit: 0df108154743ff689e3ba4ff2671e6c809eeb660
Parents: 351a610
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 6 14:28:22 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 6 14:28:22 2014 -0800
----------------------------------------------------------------------
.../broadcast/input/BroadcastInputManager.java | 194 -----
.../broadcast/input/BroadcastKVReader.java | 197 -----
.../BroadcastShuffleInputEventHandler.java | 9 +-
.../input/BroadcastShuffleManager.java | 740 -------------------
.../readers/ShuffledUnorderedKVReader.java | 198 +++++
.../library/input/ShuffledUnorderedKVInput.java | 103 ++-
.../shuffle/common/ShuffleEventHandler.java | 28 +
.../shuffle/common/impl/ShuffleManager.java | 706 ++++++++++++++++++
.../impl/SimpleFetchedInputAllocator.java | 194 +++++
.../input/TestBroadcastInputManager.java | 89 ---
.../impl/TestSimpleFetchedInputAllocator.java | 90 +++
11 files changed, 1314 insertions(+), 1234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
deleted file mode 100644
index 4af4404..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
- *
- */
-@Private
-public class BroadcastInputManager implements FetchedInputAllocator,
- FetchedInputCallback {
-
- private static final Log LOG = LogFactory.getLog(BroadcastInputManager.class);
-
- private final Configuration conf;
- private final String uniqueIdentifier;
-
- private TezTaskOutputFiles fileNameAllocator;
- private LocalDirAllocator localDirAllocator;
-
- // Configuration parameters
- private long memoryLimit;
- private long maxSingleShuffleLimit;
-
- private volatile long usedMemory = 0;
-
- private long maxAvailableTaskMemory;
- private long initialMemoryAvailable =-1l;
-
- public BroadcastInputManager(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory) {
- this.conf = conf;
- this.uniqueIdentifier = uniqueIdentifier;
- this.maxAvailableTaskMemory = maxTaskAvailableMemory;
- }
-
- @Private
- void configureAndStart() {
- Preconditions.checkState(initialMemoryAvailable != -1,
- "Initial memory must be configured before starting");
- this.fileNameAllocator = new TezTaskOutputFiles(conf,
- uniqueIdentifier);
- this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
- // Setup configuration
- final float maxInMemCopyUse = conf.getFloat(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
- if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
- throw new IllegalArgumentException("Invalid value for "
- + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
- + maxInMemCopyUse);
- }
-
- // Allow unit tests to fix Runtime memory
- long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
- Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
-
- if (memReq <= this.initialMemoryAvailable) {
- this.memoryLimit = memReq;
- } else {
- this.memoryLimit = initialMemoryAvailable;
- }
-
- LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
-
- final float singleShuffleMemoryLimitPercent = conf.getFloat(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
- if (singleShuffleMemoryLimitPercent <= 0.0f
- || singleShuffleMemoryLimitPercent > 1.0f) {
- throw new IllegalArgumentException("Invalid value for "
- + TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
- + singleShuffleMemoryLimitPercent);
- }
-
- this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
-
- LOG.info("BroadcastInputManager -> " + "MemoryLimit: " +
- this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
- }
-
- @Private
- static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
- final float maxInMemCopyUse = conf.getFloat(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
- if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
- throw new IllegalArgumentException("Invalid value for "
- + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
- + maxInMemCopyUse);
- }
- long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
- Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
- return memReq;
- }
-
- @Private
- void setInitialMemoryAvailable(long available) {
- this.initialMemoryAvailable = available;
- }
-
- @Override
- public synchronized FetchedInput allocate(long actualSize, long compressedSize,
- InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
- if (actualSize > maxSingleShuffleLimit
- || this.usedMemory + actualSize > this.memoryLimit) {
- return new DiskFetchedInput(actualSize, compressedSize,
- inputAttemptIdentifier, this, conf, localDirAllocator,
- fileNameAllocator);
- } else {
- this.usedMemory += actualSize;
- LOG.info("Used memory after allocating " + actualSize + " : " + usedMemory);
- return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
- }
- }
-
- @Override
- public synchronized void fetchComplete(FetchedInput fetchedInput) {
- switch (fetchedInput.getType()) {
- // Not tracking anything here.
- case DISK:
- case MEMORY:
- break;
- default:
- throw new TezUncheckedException("InputType: " + fetchedInput.getType()
- + " not expected for Broadcast fetch");
- }
- }
-
- @Override
- public synchronized void fetchFailed(FetchedInput fetchedInput) {
- cleanup(fetchedInput);
- }
-
- @Override
- public synchronized void freeResources(FetchedInput fetchedInput) {
- cleanup(fetchedInput);
- }
-
- private void cleanup(FetchedInput fetchedInput) {
- switch (fetchedInput.getType()) {
- case DISK:
- break;
- case MEMORY:
- unreserve(fetchedInput.getActualSize());
- break;
- default:
- throw new TezUncheckedException("InputType: " + fetchedInput.getType()
- + " not expected for Broadcast fetch");
- }
- }
-
- private synchronized void unreserve(long size) {
- this.usedMemory -= size;
- LOG.info("Used memory after freeing " + size + " : " + usedMemory);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
deleted file mode 100644
index 2354257..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-
-public class BroadcastKVReader<K, V> implements KeyValueReader {
-
- private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
-
- private final BroadcastShuffleManager shuffleManager;
- private final CompressionCodec codec;
-
- private final Class<K> keyClass;
- private final Class<V> valClass;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valDeserializer;
- private final DataInputBuffer keyIn;
- private final DataInputBuffer valIn;
-
- private final boolean ifileReadAhead;
- private final int ifileReadAheadLength;
- private final int ifileBufferSize;
-
- private final TezCounter inputRecordCounter;
-
- private K key;
- private V value;
-
- private FetchedInput currentFetchedInput;
- private IFile.Reader currentReader;
-
- // TODO Remove this once per I/O counters are separated properly. Relying on
- // the counter at the moment will generate aggregate numbers.
- private int numRecordsRead = 0;
-
- public BroadcastKVReader(BroadcastShuffleManager shuffleManager, Configuration conf,
- CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
- TezCounter inputRecordCounter)
- throws IOException {
- this.shuffleManager = shuffleManager;
-
- this.codec = codec;
- this.ifileReadAhead = ifileReadAhead;
- this.ifileReadAheadLength = ifileReadAheadLength;
- this.ifileBufferSize = ifileBufferSize;
- this.inputRecordCounter = inputRecordCounter;
-
- this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
- this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
-
- this.keyIn = new DataInputBuffer();
- this.valIn = new DataInputBuffer();
-
- SerializationFactory serializationFactory = new SerializationFactory(conf);
-
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
- this.keyDeserializer.open(keyIn);
- this.valDeserializer = serializationFactory.getDeserializer(valClass);
- this.valDeserializer.open(valIn);
- }
-
- // TODO NEWTEZ Maybe add an interface to check whether next will block.
-
- /**
- * Moves to the next key/values(s) pair
- *
- * @return true if another key/value(s) pair exists, false if there are no
- * more.
- * @throws IOException
- * if an error occurs
- */
- @Override
- public boolean next() throws IOException {
- if (readNextFromCurrentReader()) {
- inputRecordCounter.increment(1);
- numRecordsRead++;
- return true;
- } else {
- boolean nextInputExists = moveToNextInput();
- while (nextInputExists) {
- if(readNextFromCurrentReader()) {
- inputRecordCounter.increment(1);
- numRecordsRead++;
- return true;
- }
- nextInputExists = moveToNextInput();
- }
- LOG.info("Num Records read: " + numRecordsRead);
- return false;
- }
- }
-
- @Override
- public Object getCurrentKey() throws IOException {
- return (Object) key;
- }
-
- @Override
- public Object getCurrentValue() throws IOException {
- return value;
- }
-
- /**
- * Tries reading the next key and value from the current reader.
- * @return true if the current reader has more records
- * @throws IOException
- */
- private boolean readNextFromCurrentReader() throws IOException {
- // Initial reader.
- if (this.currentReader == null) {
- return false;
- } else {
- boolean hasMore = this.currentReader.nextRawKey(keyIn);
- if (hasMore) {
- this.currentReader.nextRawValue(valIn);
- this.key = keyDeserializer.deserialize(this.key);
- this.value = valDeserializer.deserialize(this.value);
- return true;
- }
- return false;
- }
- }
-
- /**
- * Moves to the next available input. This method may block if the input is not ready yet.
- * Also takes care of closing the previous input.
- *
- * @return true if the next input exists, false otherwise
- * @throws IOException
- * @throws InterruptedException
- */
- private boolean moveToNextInput() throws IOException {
- if (currentReader != null) { // Close the current reader.
- currentReader.close();
- currentFetchedInput.free();
- }
- try {
- currentFetchedInput = shuffleManager.getNextInput();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for next available input", e);
- throw new IOException(e);
- }
- if (currentFetchedInput == null) {
- return false; // No more inputs
- } else {
- currentReader = openIFileReader(currentFetchedInput);
- return true;
- }
- }
-
- public IFile.Reader openIFileReader(FetchedInput fetchedInput)
- throws IOException {
- if (fetchedInput.getType() == Type.MEMORY) {
- MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
-
- return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
- mfi.getBytes(), 0, (int) mfi.getActualSize());
- } else {
- return new IFile.Reader(fetchedInput.getInputStream(),
- fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
- ifileReadAheadLength, ifileBufferSize);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index a7a12ef..68aa49f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -35,18 +35,20 @@ import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
-public class BroadcastShuffleInputEventHandler {
+public class BroadcastShuffleInputEventHandler implements ShuffleEventHandler {
private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
- private final BroadcastShuffleManager shuffleManager;
+ private final ShuffleManager shuffleManager;
private final FetchedInputAllocator inputAllocator;
private final CompressionCodec codec;
private final boolean ifileReadAhead;
@@ -54,7 +56,7 @@ public class BroadcastShuffleInputEventHandler {
public BroadcastShuffleInputEventHandler(TezInputContext inputContext,
- BroadcastShuffleManager shuffleManager,
+ ShuffleManager shuffleManager,
FetchedInputAllocator inputAllocator, CompressionCodec codec,
boolean ifileReadAhead, int ifileReadAheadLength) {
this.shuffleManager = shuffleManager;
@@ -64,6 +66,7 @@ public class BroadcastShuffleInputEventHandler {
this.ifileReadAheadLength = ifileReadAheadLength;
}
+ @Override
public void handleEvents(List<Event> events) throws IOException {
for (Event event : events) {
handleEvent(event);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
deleted file mode 100644
index ca58396..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ /dev/null
@@ -1,740 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
-import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.shuffle.common.FetchResult;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
-import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
-import org.apache.tez.runtime.library.shuffle.common.InputHost;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCallback {
-
- private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
-
- private final TezInputContext inputContext;
- private final Configuration conf;
- private final int numInputs;
-
- private BroadcastShuffleInputEventHandler inputEventHandler;
- private FetchedInputAllocator inputManager;
-
- private ExecutorService fetcherRawExecutor;
- private ListeningExecutorService fetcherExecutor;
-
- private ExecutorService schedulerRawExecutor;
- private ListeningExecutorService schedulerExecutor;
- private RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
-
- private BlockingQueue<FetchedInput> completedInputs;
- private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
- private Set<InputIdentifier> completedInputSet;
- private ConcurrentMap<String, InputHost> knownSrcHosts;
- private BlockingQueue<InputHost> pendingHosts;
- private Set<InputAttemptIdentifier> obsoletedInputs;
-
- private AtomicInteger numCompletedInputs = new AtomicInteger(0);
-
- private long startTime;
- private long lastProgressTime;
-
- // Required to be held when manipulating pendingHosts
- private ReentrantLock lock = new ReentrantLock();
- private Condition wakeLoop = lock.newCondition();
-
- private int numFetchers;
- private AtomicInteger numRunningFetchers = new AtomicInteger(0);
-
- // Parameters required by Fetchers
- private SecretKey shuffleSecret;
- private int connectionTimeout;
- private int readTimeout;
- private CompressionCodec codec;
-
- private boolean ifileReadAhead;
- private int ifileReadAheadLength;
- private int ifileBufferSize;
-
- private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-
- private volatile Throwable shuffleError;
-
- private final AtomicBoolean isShutdown = new AtomicBoolean(false);
-
- private volatile long initialMemoryAvailable = -1l;
-
- private final TezCounter shuffledInputsCounter;
- private final TezCounter failedShufflesCounter;
- private final TezCounter bytesShuffledCounter;
- private final TezCounter decompressedDataSizeCounter;
- private final TezCounter bytesShuffledToDiskCounter;
- private final TezCounter bytesShuffledToMemCounter;
-
- // TODO More counters - FetchErrors, speed?
-
- public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
- this.inputContext = inputContext;
- this.conf = conf;
- this.numInputs = numInputs;
- long initalMemReq = getInitialMemoryReq();
- this.inputContext.requestInitialMemory(initalMemReq, this);
-
- this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
- this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
- this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
- this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
- this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
- this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
- }
-
- private void configureAndStart() throws IOException {
- Preconditions.checkState(initialMemoryAvailable != -1,
- "Initial memory available must be configured before starting");
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass = ConfigUtils
- .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- codec = null;
- }
-
- this.ifileReadAhead = conf.getBoolean(
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
- if (this.ifileReadAhead) {
- this.ifileReadAheadLength = conf.getInt(
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
- TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
- } else {
- this.ifileReadAheadLength = 0;
- }
- this.ifileBufferSize = conf.getInt("io.file.buffer.size",
- TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-
- this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf,
- inputContext.getTotalMemoryAvailableToTask());
- ((BroadcastInputManager)this.inputManager).setInitialMemoryAvailable(initialMemoryAvailable);
- ((BroadcastInputManager)this.inputManager).configureAndStart();
- this.inputEventHandler = new BroadcastShuffleInputEventHandler(
- inputContext, this, this.inputManager, codec, ifileReadAhead,
- ifileReadAheadLength);
-
- completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
- completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
- knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
- pendingHosts = new LinkedBlockingQueue<InputHost>();
- obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
-
- int maxConfiguredFetchers =
- conf.getInt(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
-
- this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
-
- this.fetcherRawExecutor = Executors.newFixedThreadPool(
- numFetchers,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(
- "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
- .build());
- this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-
- this.schedulerRawExecutor = Executors.newFixedThreadPool(
- 1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(
- "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
- .build());
- this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
-
- this.startTime = System.currentTimeMillis();
- this.lastProgressTime = startTime;
-
- this.shuffleSecret = ShuffleUtils
- .getJobTokenSecretFromTokenBytes(inputContext
- .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
-
- this.connectionTimeout = conf.getInt(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
- this.readTimeout = conf.getInt(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
-
-
- LOG.info("BroadcastShuffleManager -> numInputs: " + numInputs
- + " compressionCodec: " + (codec == null ? "NoCompressionCodec" : codec.getClass()
- .getName()) + ", numFetchers: " + numFetchers);
- }
-
- private long getInitialMemoryReq() {
- return BroadcastInputManager.getInitialMemoryReq(conf,
- inputContext.getTotalMemoryAvailableToTask());
- }
-
- public void setInitialMemoryAvailable(long available) {
- this.initialMemoryAvailable = available;
- }
-
- public void run() throws IOException {
- configureAndStart();
- ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
- Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
- // Shutdown this executor once this task, and the callback complete.
- schedulerExecutor.shutdown();
- }
-
- private class RunBroadcastShuffleCallable implements Callable<Void> {
-
- @Override
- public Void call() throws Exception {
- while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
- lock.lock();
- try {
- if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
- if (numCompletedInputs.get() < numInputs) {
- wakeLoop.await();
- }
- }
- } finally {
- lock.unlock();
- }
-
- if (shuffleError != null) {
- // InputContext has already been informed of a fatal error. Relying on
- // tez to kill the task.
- break;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("NumCompletedInputs: " + numCompletedInputs);
- }
- if (numCompletedInputs.get() < numInputs) {
- lock.lock();
- try {
- int maxFetchersToRun = numFetchers - numRunningFetchers.get();
- int count = 0;
- while (pendingHosts.peek() != null) {
- InputHost inputHost = null;
- try {
- inputHost = pendingHosts.take();
- } catch (InterruptedException e) {
- if (isShutdown.get()) {
- LOG.info("Interrupted and hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
- break;
- } else {
- throw e;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing pending host: " + inputHost.toDetailedString());
- }
- if (inputHost.getNumPendingInputs() > 0) {
- LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
- Fetcher fetcher = constructFetcherForHost(inputHost);
- numRunningFetchers.incrementAndGet();
- if (isShutdown.get()) {
- LOG.info("hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
- }
- ListenableFuture<FetchResult> future = fetcherExecutor
- .submit(fetcher);
- Futures.addCallback(future, fetchFutureCallback);
- if (++count >= maxFetchersToRun) {
- break;
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping host: " + inputHost.getHost()
- + " since it has no inputs to process");
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
- }
- LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
- // TODO NEWTEZ Maybe clean up inputs.
- if (!fetcherExecutor.isShutdown()) {
- fetcherExecutor.shutdownNow();
- }
- return null;
- }
- }
-
- private Fetcher constructFetcherForHost(InputHost inputHost) {
- FetcherBuilder fetcherBuilder = new FetcherBuilder(
- BroadcastShuffleManager.this, inputManager,
- inputContext.getApplicationId(), shuffleSecret, conf);
- fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
- if (codec != null) {
- fetcherBuilder.setCompressionParameters(codec);
- }
- fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
-
- // Remove obsolete inputs from the list being given to the fetcher. Also
- // remove from the obsolete list.
- List<InputAttemptIdentifier> pendingInputsForHost = inputHost
- .clearAndGetPendingInputs();
- for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
- .iterator(); inputIter.hasNext();) {
- InputAttemptIdentifier input = inputIter.next();
- // Avoid adding attempts which have already completed.
- if (completedInputSet.contains(input.getInputIdentifier())) {
- inputIter.remove();
- }
- // Avoid adding attempts which have been marked as OBSOLETE
- if (obsoletedInputs.contains(input)) {
- inputIter.remove();
- obsoletedInputs.remove(input);
- }
- }
- // TODO NEWTEZ Maybe limit the number of inputs being given to a single
- // fetcher, especially in the case where #hosts < #fetchers
- fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
- pendingInputsForHost);
- LOG.info("Created Fetcher for host: " + inputHost.getHost()
- + ", with inputs: " + pendingInputsForHost);
- return fetcherBuilder.build();
- }
-
- /////////////////// Methods for InputEventHandler
-
- public void addKnownInput(String hostName, int port,
- InputAttemptIdentifier srcAttemptIdentifier, int partition) {
- InputHost host = knownSrcHosts.get(hostName);
- if (host == null) {
- host = new InputHost(hostName, port, inputContext.getApplicationId());
- InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
- if (old != null) {
- host = old;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
- }
- host.addKnownInput(srcAttemptIdentifier);
- lock.lock();
- try {
- boolean added = pendingHosts.offer(host);
- if (!added) {
- String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
- LOG.error(errorMessage);
- throw new TezUncheckedException(errorMessage);
- }
- wakeLoop.signal();
- } finally {
- lock.unlock();
- }
- }
-
- public void addCompletedInputWithNoData(
- InputAttemptIdentifier srcAttemptIdentifier) {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
- LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
-
- if (!completedInputSet.contains(inputIdentifier)) {
- synchronized (completedInputSet) {
- if (!completedInputSet.contains(inputIdentifier)) {
- registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
- }
- }
- }
-
- // Awake the loop to check for termination.
- lock.lock();
- try {
- wakeLoop.signal();
- } finally {
- lock.unlock();
- }
- }
-
- public void addCompletedInputWithData(
- InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
- throws IOException {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-
- LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
- + fetchedInput.getType());
- // Count irrespective of whether this is a copy of an already fetched input
- lock.lock();
- try {
- lastProgressTime = System.currentTimeMillis();
- } finally {
- lock.unlock();
- }
-
- boolean committed = false;
- if (!completedInputSet.contains(inputIdentifier)) {
- synchronized (completedInputSet) {
- if (!completedInputSet.contains(inputIdentifier)) {
- fetchedInput.commit();
- committed = true;
- registerCompletedInput(fetchedInput);
- }
- }
- }
- if (!committed) {
- fetchedInput.abort(); // If this fails, the fetcher may attempt another
- // abort.
- } else {
- lock.lock();
- try {
- // Signal the wakeLoop to check for termination.
- wakeLoop.signal();
- } finally {
- lock.unlock();
- }
- }
- }
-
- public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
- obsoletedInputs.add(srcAttemptIdentifier);
- // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
- }
-
-
- public void handleEvents(List<Event> events) throws IOException {
- inputEventHandler.handleEvents(events);
- }
-
- /////////////////// End of Methods for InputEventHandler
- /////////////////// Methods from FetcherCallbackHandler
-
- @Override
- public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
- FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
- throws IOException {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-
- LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
-
- // Count irrespective of whether this is a copy of an already fetched input
- lock.lock();
- try {
- lastProgressTime = System.currentTimeMillis();
- } finally {
- lock.unlock();
- }
-
- boolean committed = false;
- if (!completedInputSet.contains(inputIdentifier)) {
- synchronized (completedInputSet) {
- if (!completedInputSet.contains(inputIdentifier)) {
- fetchedInput.commit();
- committed = true;
-
- // Processing counters for completed and commit fetches only. Need
- // additional counters for excessive fetches - which primarily comes
- // in after speculation or retries.
- shuffledInputsCounter.increment(1);
- bytesShuffledCounter.increment(fetchedBytes);
- if (fetchedInput.getType() == Type.MEMORY) {
- bytesShuffledToMemCounter.increment(fetchedBytes);
- } else {
- bytesShuffledToDiskCounter.increment(fetchedBytes);
- }
- decompressedDataSizeCounter.increment(decompressedLength);
-
- registerCompletedInput(fetchedInput);
- }
- }
- }
- if (!committed) {
- fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
- } else {
- lock.lock();
- try {
- // Signal the wakeLoop to check for termination.
- wakeLoop.signal();
- } finally {
- lock.unlock();
- }
- }
- // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
- }
-
- @Override
- public void fetchFailed(String host,
- InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
- // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
- // For now, reporting immediately.
- LOG.info("Fetch failed for src: " + srcAttemptIdentifier
- + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
- + connectFailed);
- failedShufflesCounter.increment(1);
- if (srcAttemptIdentifier == null) {
- String message = "Received fetchFailure for an unknown src (null)";
- LOG.fatal(message);
- inputContext.fatalError(null, message);
- } else {
- InputReadErrorEvent readError = new InputReadErrorEvent(
- "Fetch failure while fetching from "
- + TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(),
- srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
- srcAttemptIdentifier.getAttemptNumber()),
- srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
- srcAttemptIdentifier.getAttemptNumber());
-
- List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
- failedEvents.add(readError);
- inputContext.sendEvents(failedEvents);
- }
- }
- /////////////////// End of Methods from FetcherCallbackHandler
-
- public void shutdown() throws InterruptedException {
- isShutdown.set(true);
- if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
- this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
- }
- if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
- this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
- }
- }
-
- private void registerCompletedInput(FetchedInput fetchedInput) {
- lock.lock();
- try {
- completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
- completedInputs.add(fetchedInput);
- if (!inputReadyNotificationSent.getAndSet(true)) {
- inputContext.inputIsReady();
- }
- numCompletedInputs.incrementAndGet();
- } finally {
- lock.unlock();
- }
- }
-
- /////////////////// Methods for walking the available inputs
-
- /**
- * @return true if there is another input ready for consumption.
- */
- public boolean newInputAvailable() {
- FetchedInput head = completedInputs.peek();
- if (head == null || head instanceof NullFetchedInput) {
- return false;
- } else {
- return true;
- }
- }
-
- /**
- * @return true if all of the required inputs have been fetched.
- */
- public boolean allInputsFetched() {
- lock.lock();
- try {
- return numCompletedInputs.get() == numInputs;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * @return the next available input, or null if there are no available inputs.
- * This method will block if there are currently no available inputs,
- * but more may become available.
- */
- public FetchedInput getNextInput() throws InterruptedException {
- FetchedInput input = null;
- do {
- // Check for no additional inputs
- lock.lock();
- try {
- input = completedInputs.peek();
- if (input == null && allInputsFetched()) {
- break;
- }
- } finally {
- lock.unlock();
- }
- input = completedInputs.take(); // block
- } while (input instanceof NullFetchedInput);
- return input;
- }
- /////////////////// End of methods for walking the available inputs
-
- @SuppressWarnings("rawtypes")
- public BroadcastKVReader createReader(TezCounter inputRecordCounter) throws IOException {
- return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength,
- ifileBufferSize, inputRecordCounter);
- }
-
- /**
- * Fake input that is added to the completed input list in case an input does not have any data.
- *
- */
- private class NullFetchedInput extends FetchedInput {
-
- public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
- super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public void commit() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public void abort() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public void free() {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
- }
-
-
- private class SchedulerFutureCallback implements FutureCallback<Void> {
-
- @Override
- public void onSuccess(Void result) {
- LOG.info("Scheduler thread completed");
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (isShutdown.get()) {
- LOG.info("Already shutdown. Ignoring error: " + t);
- } else {
- LOG.error("Scheduler failed with error: ", t);
- inputContext.fatalError(t, "Broadcast Scheduler Failed");
- }
- }
-
- }
-
- private class FetchFutureCallback implements FutureCallback<FetchResult> {
-
- private void doBookKeepingForFetcherComplete() {
- numRunningFetchers.decrementAndGet();
- lock.lock();
- try {
- wakeLoop.signal();
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void onSuccess(FetchResult result) {
- Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
- if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
- InputHost inputHost = knownSrcHosts.get(result.getHost());
- assert inputHost != null;
- for (InputAttemptIdentifier input : pendingInputs) {
- inputHost.addKnownInput(input);
- }
- pendingHosts.add(inputHost);
- }
- doBookKeepingForFetcherComplete();
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (isShutdown.get()) {
- LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
- } else {
- LOG.error("Fetcher failed with error: ", t);
- shuffleError = t;
- inputContext.fatalError(t, "Fetch failed");
- doBookKeepingForFetcherComplete();
- }
- }
- }
-
- @Override
- public void memoryAssigned(long assignedSize) {
- this.initialMemoryAvailable = assignedSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
new file mode 100644
index 0000000..796890c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.readers;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+
+public class ShuffledUnorderedKVReader<K, V> implements KeyValueReader {
+
+ private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVReader.class);
+
+ private final ShuffleManager shuffleManager;
+ private final CompressionCodec codec;
+
+ private final Class<K> keyClass;
+ private final Class<V> valClass;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valDeserializer;
+ private final DataInputBuffer keyIn;
+ private final DataInputBuffer valIn;
+
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+ private final int ifileBufferSize;
+
+ private final TezCounter inputRecordCounter;
+
+ private K key;
+ private V value;
+
+ private FetchedInput currentFetchedInput;
+ private IFile.Reader currentReader;
+
+ // TODO Remove this once per I/O counters are separated properly. Relying on
+ // the counter at the moment will generate aggregate numbers.
+ private int numRecordsRead = 0;
+
+ public ShuffledUnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
+ CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
+ TezCounter inputRecordCounter)
+ throws IOException {
+ this.shuffleManager = shuffleManager;
+
+ this.codec = codec;
+ this.ifileReadAhead = ifileReadAhead;
+ this.ifileReadAheadLength = ifileReadAheadLength;
+ this.ifileBufferSize = ifileBufferSize;
+ this.inputRecordCounter = inputRecordCounter;
+
+ this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+
+ this.keyIn = new DataInputBuffer();
+ this.valIn = new DataInputBuffer();
+
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(keyIn);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+ this.valDeserializer.open(valIn);
+ }
+
+ // TODO NEWTEZ Maybe add an interface to check whether next will block.
+
+ /**
+ * Moves to the next key/values(s) pair
+ *
+ * @return true if another key/value(s) pair exists, false if there are no
+ * more.
+ * @throws IOException
+ * if an error occurs
+ */
+ @Override
+ public boolean next() throws IOException {
+ if (readNextFromCurrentReader()) {
+ inputRecordCounter.increment(1);
+ numRecordsRead++;
+ return true;
+ } else {
+ boolean nextInputExists = moveToNextInput();
+ while (nextInputExists) {
+ if(readNextFromCurrentReader()) {
+ inputRecordCounter.increment(1);
+ numRecordsRead++;
+ return true;
+ }
+ nextInputExists = moveToNextInput();
+ }
+ LOG.info("Num Records read: " + numRecordsRead);
+ return false;
+ }
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return (Object) key;
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ return value;
+ }
+
+ /**
+ * Tries reading the next key and value from the current reader.
+ * @return true if the current reader has more records
+ * @throws IOException
+ */
+ private boolean readNextFromCurrentReader() throws IOException {
+ // Initial reader.
+ if (this.currentReader == null) {
+ return false;
+ } else {
+ boolean hasMore = this.currentReader.nextRawKey(keyIn);
+ if (hasMore) {
+ this.currentReader.nextRawValue(valIn);
+ this.key = keyDeserializer.deserialize(this.key);
+ this.value = valDeserializer.deserialize(this.value);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Moves to the next available input. This method may block if the input is not ready yet.
+ * Also takes care of closing the previous input.
+ *
+ * @return true if the next input exists, false otherwise
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private boolean moveToNextInput() throws IOException {
+ if (currentReader != null) { // Close the current reader.
+ currentReader.close();
+ currentFetchedInput.free();
+ }
+ try {
+ currentFetchedInput = shuffleManager.getNextInput();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for next available input", e);
+ throw new IOException(e);
+ }
+ if (currentFetchedInput == null) {
+ return false; // No more inputs
+ } else {
+ currentReader = openIFileReader(currentFetchedInput);
+ return true;
+ }
+ }
+
+ public IFile.Reader openIFileReader(FetchedInput fetchedInput)
+ throws IOException {
+ if (fetchedInput.getType() == Type.MEMORY) {
+ MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
+
+ return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
+ mfi.getBytes(), 0, (int) mfi.getActualSize());
+ } else {
+ return new IFile.Reader(fetchedInput.getInputStream(),
+ fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index e69a955..adbeff8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -29,51 +29,68 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleInputEventHandler;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.readers.ShuffledUnorderedKVReader;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
+import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
import com.google.common.base.Preconditions;
-public class ShuffledUnorderedKVInput implements LogicalInput {
+public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallback {
private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
private Configuration conf;
private int numInputs = -1;
- private BroadcastShuffleManager shuffleManager;
+ private TezInputContext inputContext;
+ private ShuffleManager shuffleManager;
private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
private volatile long firstEventReceivedTime = -1;
@SuppressWarnings("rawtypes")
- private BroadcastKVReader kvReader;
+ private ShuffledUnorderedKVReader kvReader;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private TezCounter inputRecordCounter;
+ private SimpleFetchedInputAllocator inputManager;
+ private BroadcastShuffleInputEventHandler inputEventHandler;
+
+ private volatile long initialMemoryAvailable = -1;
+
public ShuffledUnorderedKVInput() {
}
@Override
public List<Event> initialize(TezInputContext inputContext) throws Exception {
Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
+ this.inputContext = inputContext;
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
- this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
- this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
if (numInputs == 0) {
inputContext.requestInitialMemory(0l, null);
isStarted.set(true);
inputContext.inputIsReady();
return Collections.emptyList();
+ } else {
+ long initalMemReq = getInitialMemoryReq();
+ this.inputContext.requestInitialMemory(initalMemReq, this);
}
- this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
+ this.inputRecordCounter = inputContext.getCounters().findCounter(
+ TaskCounter.INPUT_RECORDS_PROCESSED);
return Collections.emptyList();
}
@@ -81,8 +98,50 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
public void start() throws IOException {
synchronized (this) {
if (!isStarted.get()) {
+ ////// Initial configuration
+ Preconditions.checkState(initialMemoryAvailable != -1,
+ "Initial memory available must be configured before starting");
+ CompressionCodec codec;
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } else {
+ codec = null;
+ }
+
+ boolean ifileReadAhead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ int ifileReadAheadLength = 0;
+ int ifileBufferSize = 0;
+
+ if (ifileReadAhead) {
+ ifileReadAheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ }
+ ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+ this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs);
+
+ this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
+ inputContext.getTotalMemoryAvailableToTask());
+ inputManager.setInitialMemoryAvailable(initialMemoryAvailable);
+ inputManager.configureAndStart();
+
+ this.inputEventHandler = new BroadcastShuffleInputEventHandler(
+ inputContext, shuffleManager, inputManager, codec, ifileReadAhead,
+ ifileReadAheadLength);
+
+ this.shuffleManager.setCompressionCodec(codec);
+ this.shuffleManager.setIfileParameters(ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
+ this.shuffleManager.setFetchedInputAllocator(inputManager);
+ this.shuffleManager.setInputEventHandler(inputEventHandler);
+ ////// End of Initial configuration
+
this.shuffleManager.run();
- this.kvReader = this.shuffleManager.createReader(inputRecordCounter);
+ this.kvReader = createReader(inputRecordCounter, codec,
+ ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
List<Event> pending = new LinkedList<Event>();
pendingEvents.drainTo(pending);
if (pending.size() > 0) {
@@ -129,6 +188,9 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
if (firstEventReceivedTime == -1) {
firstEventReceivedTime = System.currentTimeMillis();
}
+ // This queue will keep growing if the Processor decides never to
+ // start the event. The Input, however has no idea, on whether start
+ // will be invoked or not.
pendingEvents.addAll(inputEvents);
return;
}
@@ -139,7 +201,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
@Override
public List<Event> close() throws Exception {
- if (numInputs != 0) {
+ if (this.shuffleManager != null) {
this.shuffleManager.shutdown();
}
return null;
@@ -150,4 +212,23 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
this.numInputs = numInputs;
}
-}
+ @Override
+ public void memoryAssigned(long assignedSize) {
+ this.initialMemoryAvailable = assignedSize;
+ }
+
+ private long getInitialMemoryReq() {
+ return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
+ inputContext.getTotalMemoryAvailableToTask());
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ private ShuffledUnorderedKVReader createReader(TezCounter inputRecordCounter, CompressionCodec codec,
+ int ifileBufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength)
+ throws IOException {
+ return new ShuffledUnorderedKVReader(shuffleManager, conf, codec, ifileReadAheadEnabled,
+ ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java
new file mode 100644
index 0000000..ee9979c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tez.runtime.api.Event;
+
+public interface ShuffleEventHandler {
+ public void handleEvents(List<Event> events) throws IOException;
+}