You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/20 19:19:30 UTC
[2/2] git commit: TEZ-414. Implement a non sorted,
non aggregated K-V input/output pair. (sseth)
TEZ-414. Implement a non sorted, non aggregated K-V input/output pair.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bd76ffcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bd76ffcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bd76ffcf
Branch: refs/heads/TEZ-398
Commit: bd76ffcf2b44eff1c2d8a3abfd24a978edccdc58
Parents: bda095c
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 20 10:18:36 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 20 10:18:36 2013 -0700
----------------------------------------------------------------------
.../broadcast/input/BroadcastInputManager.java | 138 +++++
.../broadcast/input/BroadcastKVReader.java | 225 +++++++
.../BroadcastShuffleInputEventHandler.java | 88 +++
.../input/BroadcastShuffleManager.java | 489 +++++++++++++++
.../broadcast/output/FileBasedKVWriter.java | 125 ++++
.../engine/common/InputAttemptIdentifier.java | 95 +++
.../tez/engine/common/InputIdentifier.java | 56 ++
.../tez/engine/common/TezEngineUtils.java | 21 +
.../tez/engine/common/shuffle/impl/Fetcher.java | 42 +-
.../common/shuffle/impl/InMemoryReader.java | 5 +-
.../tez/engine/common/shuffle/impl/MapHost.java | 14 +-
.../engine/common/shuffle/impl/MapOutput.java | 13 +-
.../common/shuffle/impl/MergeManager.java | 18 +-
.../tez/engine/common/shuffle/impl/Shuffle.java | 1 +
.../common/shuffle/impl/ShuffleHeader.java | 8 +
.../shuffle/impl/ShuffleInputEventHandler.java | 5 +-
.../common/shuffle/impl/ShuffleScheduler.java | 60 +-
.../shuffle/impl/TaskAttemptIdentifier.java | 95 ---
.../engine/common/sort/impl/ExternalSorter.java | 26 +-
.../tez/engine/common/sort/impl/IFile.java | 2 +-
.../newoutput/TezLocalTaskOutputFiles.java | 21 +-
.../task/local/newoutput/TezTaskOutput.java | 13 +-
.../local/newoutput/TezTaskOutputFiles.java | 18 +-
.../lib/input/ShuffledUnorderedKVInput.java | 76 +++
.../engine/lib/output/OnFileSortedOutput.java | 4 +-
.../lib/output/OnFileUnorderedKVOutput.java | 98 +++
.../org/apache/tez/engine/newapi/KVReader.java | 5 +-
.../engine/shuffle/common/DiskFetchedInput.java | 111 ++++
.../tez/engine/shuffle/common/FetchResult.java | 70 +++
.../tez/engine/shuffle/common/FetchedInput.java | 144 +++++
.../shuffle/common/FetchedInputAllocator.java | 31 +
.../shuffle/common/FetchedInputCallback.java | 29 +
.../tez/engine/shuffle/common/Fetcher.java | 608 +++++++++++++++++++
.../engine/shuffle/common/FetcherCallback.java | 31 +
.../tez/engine/shuffle/common/InputHost.java | 90 +++
.../shuffle/common/MemoryFetchedInput.java | 89 +++
.../tez/engine/shuffle/common/ShuffleUtils.java | 16 +
tez-engine/src/main/proto/ShufflePayloads.proto | 9 +-
38 files changed, 2781 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
new file mode 100644
index 0000000..78d2e0c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.shuffle.common.DiskFetchedInput;
+import org.apache.tez.engine.shuffle.common.FetchedInput;
+import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.engine.shuffle.common.FetchedInputCallback;
+import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
+
+public class BroadcastInputManager implements FetchedInputAllocator,
+ FetchedInputCallback {
+
+ private final Configuration conf;
+
+ private final TezTaskOutputFiles fileNameAllocator;
+ private final LocalDirAllocator localDirAllocator;
+
+ // Configuration parameters
+ private final long memoryLimit;
+ private final long maxSingleShuffleLimit;
+
+ private long usedMemory = 0;
+
+ public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
+ this.conf = conf;
+
+ this.fileNameAllocator = new TezTaskOutputFiles(conf,
+ inputContext.getUniqueIdentifier());
+ this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+ // Setup configuration
+ final float maxInMemCopyUse = conf.getFloat(
+ TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new IllegalArgumentException("Invalid value for "
+ + TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ + maxInMemCopyUse);
+ }
+
+ // Allow unit tests to fix Runtime memory
+ this.memoryLimit = (long) (conf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY,
+ Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+ final float singleShuffleMemoryLimitPercent = conf.getFloat(
+ TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ if (singleShuffleMemoryLimitPercent <= 0.0f
+ || singleShuffleMemoryLimitPercent > 1.0f) {
+ throw new IllegalArgumentException("Invalid value for "
+ + TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ + singleShuffleMemoryLimitPercent);
+ }
+
+ this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+ }
+
+ @Override
+ public synchronized FetchedInput allocate(long size,
+ InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+ if (size > maxSingleShuffleLimit
+ || this.usedMemory + size > this.memoryLimit) {
+ return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
+ localDirAllocator, fileNameAllocator);
+ } else {
+ this.usedMemory += size;
+ return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
+ }
+ }
+
+ @Override
+ public void fetchComplete(FetchedInput fetchedInput) {
+ switch (fetchedInput.getType()) {
+ // Not tracking anything here.
+ case DISK:
+ case MEMORY:
+ break;
+ default:
+ throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ + " not expected for Broadcast fetch");
+ }
+ }
+
+ @Override
+ public void fetchFailed(FetchedInput fetchedInput) {
+ cleanup(fetchedInput);
+ }
+
+ @Override
+ public void freeResources(FetchedInput fetchedInput) {
+ cleanup(fetchedInput);
+ }
+
+ private void cleanup(FetchedInput fetchedInput) {
+ switch (fetchedInput.getType()) {
+ case DISK:
+ break;
+ case MEMORY:
+ unreserve(fetchedInput.getSize());
+ break;
+ default:
+ throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ + " not expected for Broadcast fetch");
+ }
+ }
+
+ private synchronized void unreserve(long size) {
+ this.usedMemory -= size;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
new file mode 100644
index 0000000..b36c240
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.newapi.KVReader;
+import org.apache.tez.engine.shuffle.common.FetchedInput;
+import org.apache.tez.engine.shuffle.common.FetchedInput.Type;
+import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
+
+public class BroadcastKVReader<K, V> implements KVReader {
+
+ private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
+
+ private final BroadcastShuffleManager shuffleManager;
+ private final Configuration conf;
+ private final CompressionCodec codec;
+
+ private final Class<K> keyClass;
+ private final Class<V> valClass;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valDeserializer;
+ private final DataInputBuffer keyIn;
+ private final DataInputBuffer valIn;
+
+ private final SimpleValueIterator valueIterator;
+ private final SimpleIterable valueIterable;
+
+ private K key;
+ private V value;
+
+ private FetchedInput currentFetchedInput;
+ private IFile.Reader currentReader;
+
+
+ public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
+ Configuration conf) {
+ this.shuffleManager = shuffleManager;
+ this.conf = conf;
+
+ if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } else {
+ codec = null;
+ }
+
+ this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+
+ this.keyIn = new DataInputBuffer();
+ this.valIn = new DataInputBuffer();
+
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+
+ this.valueIterator = new SimpleValueIterator();
+ this.valueIterable = new SimpleIterable(this.valueIterator);
+ }
+
+ // TODO NEWTEZ Maybe add an interface to check whether next will block.
+
+ /**
+ * Moves to the next key/values(s) pair
+ *
+ * @return true if another key/value(s) pair exists, false if there are no
+ * more.
+ * @throws IOException
+ * if an error occurs
+ */
+ @Override
+ public boolean next() throws IOException {
+ if (readNextFromCurrentReader()) {
+ return true;
+ } else {
+ boolean nextInputExists = moveToNextInput();
+ while (nextInputExists) {
+ if(readNextFromCurrentReader()) {
+ return true;
+ }
+ nextInputExists = moveToNextInput();
+ }
+ return false;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KVRecord getCurrentKV() throws IOException {
+ this.valueIterator.setValue(value);
+ return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
+ }
+
+ /**
+ * Tries reading the next key and value from the current reader.
+ * @return true if the current reader has more records
+ * @throws IOException
+ */
+ private boolean readNextFromCurrentReader() throws IOException {
+ // Initial reader.
+ if (this.currentReader == null) {
+ return false;
+ } else {
+ boolean hasMore = this.currentReader.nextRawKey(keyIn);
+ if (hasMore) {
+ this.currentReader.nextRawValue(valIn);
+ this.key = keyDeserializer.deserialize(this.key);
+ this.value = valDeserializer.deserialize(this.value);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Moves to the next available input. This method may block if the input is not ready yet.
+ * Also takes care of closing the previous input.
+ *
+ * @return true if the next input exists, false otherwise
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private boolean moveToNextInput() throws IOException {
+ if (currentReader != null) { // Close the current reader.
+ currentReader.close();
+ currentFetchedInput.free();
+ }
+ try {
+ currentFetchedInput = shuffleManager.getNextInput();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for next available input", e);
+ throw new IOException(e);
+ }
+ if (currentFetchedInput == null) {
+ return false; // No more inputs
+ } else {
+ currentReader = openIFileReader(currentFetchedInput);
+ return true;
+ }
+ }
+
+ public IFile.Reader openIFileReader(FetchedInput fetchedInput)
+ throws IOException {
+ if (fetchedInput.getType() == Type.MEMORY) {
+ MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
+
+ return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
+ mfi.getBytes(), 0, (int) mfi.getSize());
+ } else {
+ return new IFile.Reader(conf, fetchedInput.getInputStream(),
+ fetchedInput.getSize(), codec, null);
+ }
+ }
+
+
+
+ // TODO NEWTEZ Move this into a common class. Also used in SImpleInput
+ private class SimpleValueIterator implements Iterator<V> {
+
+ private V value;
+
+ public void setValue(V value) {
+ this.value = value;
+ }
+
+ public boolean hasNext() {
+ return value != null;
+ }
+
+ public V next() {
+ V value = this.value;
+ this.value = null;
+ return value;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class SimpleIterable implements Iterable<V> {
+ private final Iterator<V> iterator;
+ public SimpleIterable(Iterator<V> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public Iterator<V> iterator() {
+ return iterator;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
new file mode 100644
index 0000000..703e1d9
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.shuffle.impl.ShuffleInputEventHandler;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class BroadcastShuffleInputEventHandler {
+
+ private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+
+ private final BroadcastShuffleManager shuffleManager;
+
+ public BroadcastShuffleInputEventHandler(TezInputContext inputContext, BroadcastShuffleManager shuffleManager) {
+ this.shuffleManager = shuffleManager;
+ }
+
+ public void handleEvents(List<Event> events) {
+ for (Event event : events) {
+ handleEvent(event);
+ }
+ }
+
+ private void handleEvent(Event event) {
+ if (event instanceof DataMovementEvent) {
+ processDataMovementEvent((DataMovementEvent)event);
+ } else if (event instanceof InputFailedEvent) {
+ processInputFailedEvent((InputFailedEvent)event);
+ } else {
+ throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
+ }
+ }
+
+
+ private void processDataMovementEvent(DataMovementEvent dme) {
+ Preconditions.checkArgument(dme.getSourceIndex() == 0,
+ "Unexpected srcIndex: " + dme.getSourceIndex()
+ + " on DataMovementEvent. Can only be 0");
+ DataMovementEventPayloadProto shufflePayload;
+ try {
+ shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+ }
+ if (shufflePayload.getOutputGenerated()) {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
+ shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
+ } else {
+ shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
+ }
+ }
+
+ private void processInputFailedEvent(InputFailedEvent ife) {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+ shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
new file mode 100644
index 0000000..dace07c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.InputIdentifier;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.shuffle.common.FetchResult;
+import org.apache.tez.engine.shuffle.common.FetchedInput;
+import org.apache.tez.engine.shuffle.common.Fetcher;
+import org.apache.tez.engine.shuffle.common.FetcherCallback;
+import org.apache.tez.engine.shuffle.common.InputHost;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
+import org.apache.tez.engine.shuffle.common.Fetcher.FetcherBuilder;
+import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class BroadcastShuffleManager implements FetcherCallback {
+
+ private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
+
+ private TezInputContext inputContext;
+ private int numInputs;
+ private Configuration conf;
+
+ private final BroadcastShuffleInputEventHandler inputEventHandler;
+ private final FetchedInputAllocator inputManager;
+
+ private final ExecutorService fetcherRawExecutor;
+ private final ListeningExecutorService fetcherExecutor;
+
+ private final BlockingQueue<FetchedInput> completedInputs;
+ private final Set<InputIdentifier> completedInputSet;
+ private final Set<InputIdentifier> pendingInputs;
+ private final ConcurrentMap<String, InputHost> knownSrcHosts;
+ private final Set<InputHost> pendingHosts;
+ private final Set<InputAttemptIdentifier> obsoletedInputs;
+
+ private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
+
+ private final long startTime;
+ private long lastProgressTime;
+
+ private FutureTask<Void> runShuffleFuture;
+
+ // Required to be held when manipulating pendingHosts
+ private ReentrantLock lock = new ReentrantLock();
+ private Condition wakeLoop = lock.newCondition();
+
+ private final int numFetchers;
+ private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
+
+ // Parameters required by Fetchers
+ private final SecretKey shuffleSecret;
+ private final int connectionTimeout;
+ private final int readTimeout;
+ private final CompressionCodec codec;
+ private final Decompressor decompressor;
+
+ private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
+
+ private volatile Throwable shuffleError;
+
+ // TODO NEWTEZ Add counters.
+
+ public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+ this.inputContext = inputContext;
+ this.conf = conf;
+ this.numInputs = numInputs;
+
+ this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
+ this.inputManager = new BroadcastInputManager(inputContext, conf);
+
+ pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+ completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+ completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+ knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+ pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
+ obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
+
+ int maxConfiguredFetchers =
+ conf.getInt(
+ TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+
+ this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
+
+ this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
+ .build());
+ this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+
+ this.startTime = System.currentTimeMillis();
+ this.lastProgressTime = startTime;
+
+ this.shuffleSecret = ShuffleUtils
+ .getJobTokenSecretFromTokenBytes(inputContext
+ .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+
+ this.connectionTimeout = conf.getInt(
+ TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
+ this.readTimeout = conf.getInt(
+ TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ decompressor = CodecPool.getDecompressor(codec);
+ } else {
+ codec = null;
+ decompressor = null;
+ }
+ }
+
+ public void run() {
+ RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
+ runShuffleFuture = new FutureTask<Void>(callable);
+ new Thread(runShuffleFuture, "ShuffleRunner");
+ }
+
+ private class RunBroadcastShuffleCallable implements Callable<Void> {
+
+ @Override
+ public Void call() throws Exception {
+ while (numCompletedInputs.get() < numInputs) {
+ if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+ synchronized(lock) {
+ wakeLoop.await();
+ }
+ if (shuffleError != null) {
+ // InputContext has already been informed of a fatal error.
+ // Initiate shutdown.
+ break;
+ }
+
+ if (numCompletedInputs.get() < numInputs) {
+ synchronized (lock) {
+ int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
+ int count = 0;
+ for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
+ InputHost inputHost = inputHostIter.next();
+ inputHostIter.remove();
+ if (inputHost.getNumPendingInputs() > 0) {
+ Fetcher fetcher = constructFetcherForHost(inputHost);
+ numRunningFetchers.incrementAndGet();
+ ListenableFuture<FetchResult> future = fetcherExecutor
+ .submit(fetcher);
+ Futures.addCallback(future, fetchFutureCallback);
+ if (++count >= numFetchersToRun) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ // TODO NEWTEZ Maybe clean up inputs.
+ if (!fetcherExecutor.isShutdown()) {
+ fetcherExecutor.shutdownNow();
+ }
+ return null;
+ }
+ }
+
+ private Fetcher constructFetcherForHost(InputHost inputHost) {
+ FetcherBuilder fetcherBuilder = new FetcherBuilder(
+ BroadcastShuffleManager.this, inputManager,
+ inputContext.getApplicationId(), shuffleSecret, conf);
+ fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
+ fetcherBuilder.setCompressionParameters(codec, decompressor);
+
+ // Remove obsolete inputs from the list being given to the fetcher. Also
+ // remove from the obsolete list.
+ List<InputAttemptIdentifier> pendingInputsForHost = inputHost
+ .clearAndGetPendingInputs();
+ for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
+ .iterator(); inputIter.hasNext();) {
+ InputAttemptIdentifier input = inputIter.next();
+ // Avoid adding attempts which have already completed.
+ if (completedInputSet.contains(input.getInputIdentifier())) {
+ inputIter.remove();
+ }
+ // Avoid adding attempts which have been marked as OBSOLETE
+ if (obsoletedInputs.contains(input)) {
+ inputIter.remove();
+ obsoletedInputs.remove(input);
+ }
+ }
+ // TODO NEWTEZ Maybe limit the number of inputs being given to a single
+ // fetcher, especially in the case where #hosts < #fetchers
+ fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
+ inputHost.clearAndGetPendingInputs());
+ return fetcherBuilder.build();
+ }
+
+ /////////////////// Methods for InputEventHandler
+
+ public void addKnownInput(String hostName, int port,
+ InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+ InputHost host = knownSrcHosts.get(hostName);
+ if (host == null) {
+ host = new InputHost(hostName, port, inputContext.getApplicationId());
+ InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
+ if (old != null) {
+ host = old;
+ }
+ }
+ host.addKnownInput(srcAttemptIdentifier);
+ synchronized(lock) {
+ pendingHosts.add(host);
+ wakeLoop.signal();
+ }
+ }
+
+ public void addCompletedInputWithNoData(
+ InputAttemptIdentifier srcAttemptIdentifier) {
+ InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+ if (pendingInputs.remove(inputIdentifier)) {
+ completedInputSet.add(inputIdentifier);
+ completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
+ numCompletedInputs.incrementAndGet();
+ }
+
+ // Awake the loop to check for termination.
+ synchronized (lock) {
+ wakeLoop.signal();
+ }
+ }
+
+ public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+ obsoletedInputs.add(srcAttemptIdentifier);
+ // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
+ }
+
+
+ public void handleEvents(List<Event> events) {
+ inputEventHandler.handleEvents(events);
+ }
+
+ /////////////////// End of Methods for InputEventHandler
+ /////////////////// Methods from FetcherCallbackHandler
+
+ @Override
+ public void fetchSucceeded(String host,
+ InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
+ long copyDuration) throws IOException {
+ InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+ }
+
+ // Count irrespective of whether this is a copy of an already fetched input
+ synchronized(lock) {
+ lastProgressTime = System.currentTimeMillis();
+ }
+
+ boolean committed = false;
+ if (!completedInputSet.contains(inputIdentifier)) {
+ synchronized (completedInputSet) {
+ if (!completedInputSet.contains(inputIdentifier)) {
+ fetchedInput.commit();
+ committed = true;
+ pendingInputs.remove(inputIdentifier);
+ completedInputSet.add(inputIdentifier);
+ completedInputs.add(fetchedInput);
+ numCompletedInputs.incrementAndGet();
+ }
+ }
+ }
+ if (!committed) {
+ fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
+ } else {
+ synchronized(lock) {
+ // Signal the wakeLoop to check for termination.
+ wakeLoop.signal();
+ }
+ }
+ // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
+ }
+
+ @Override
+ public void fetchFailed(String host,
+ InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+ // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
+ // For now, reporting immediately.
+ InputReadErrorEvent readError = new InputReadErrorEvent(
+ "Fetch failure while fetching from "
+ + TezEngineUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(),
+ srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ srcAttemptIdentifier.getAttemptNumber()),
+ srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ srcAttemptIdentifier.getAttemptNumber());
+
+ List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+ failedEvents.add(readError);
+ inputContext.sendEvents(failedEvents);
+ }
+ /////////////////// End of Methods from FetcherCallbackHandler
+
+ public void shutdown() throws InterruptedException {
+ if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+ this.fetcherExecutor.shutdown();
+ this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
+ if (!this.fetcherExecutor.isShutdown()) {
+ this.fetcherExecutor.shutdownNow();
+ }
+ }
+ }
+
+ /////////////////// Methods for walking the available inputs
+
+ /**
+ * @return true if there is another input ready for consumption.
+ */
+ public boolean newInputAvailable() {
+ FetchedInput head = completedInputs.peek();
+ if (head == null || head instanceof NullFetchedInput) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * @return true if all of the required inputs have been fetched.
+ */
+ public boolean allInputsFetched() {
+ return numCompletedInputs.get() == numInputs;
+ }
+
+ /**
+ * @return the next available input, or null if there are no available inputs.
+ * This method will block if there are currently no available inputs,
+ * but more may become available.
+ */
+ public FetchedInput getNextInput() throws InterruptedException {
+ FetchedInput input = null;
+ do {
+ input = completedInputs.peek();
+ if (input == null) {
+ if (allInputsFetched()) {
+ break;
+ } else {
+ input = completedInputs.take(); // block
+ }
+ } else {
+ input = completedInputs.poll();
+ }
+ } while (input instanceof NullFetchedInput);
+ return input;
+ }
+
+ /////////////////// End of methods for walking the available inputs
+
+
+ /**
+ * Fake input that is added to the completed input list in case an input does not have any data.
+ *
+ */
+ private class NullFetchedInput extends FetchedInput {
+
+ public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+ super(Type.MEMORY, -1, inputAttemptIdentifier, null);
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public void commit() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public void abort() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public void free() {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+ }
+
+
+ private class FetchFutureCallback implements FutureCallback<FetchResult> {
+
+ private void doBookKeepingForFetcherComplete() {
+ numRunningFetchers.decrementAndGet();
+ synchronized(lock) {
+ wakeLoop.signal();
+ }
+ }
+
+ @Override
+ public void onSuccess(FetchResult result) {
+ Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+ if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+ InputHost inputHost = knownSrcHosts.get(result.getHost());
+ assert inputHost != null;
+ for (InputAttemptIdentifier input : pendingInputs) {
+ inputHost.addKnownInput(input);
+ }
+ pendingHosts.add(inputHost);
+ }
+ doBookKeepingForFetcherComplete();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Fetcher failed with error: " + t);
+ shuffleError = t;
+ inputContext.fatalError(t, "Fetched failed");
+ doBookKeepingForFetcherComplete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
new file mode 100644
index 0000000..9f3dbbe
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
@@ -0,0 +1,125 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.broadcast.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
+import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.TezOutputContext;
+
+public class FileBasedKVWriter implements KVWriter {
+
+ public static final int INDEX_RECORD_LENGTH = 24;
+
+ private final Configuration conf;
+ private int numRecords = 0;
+
+ @SuppressWarnings("rawtypes")
+ private Class keyClass;
+ @SuppressWarnings("rawtypes")
+ private Class valClass;
+ private CompressionCodec codec;
+ private FileSystem rfs;
+ private IFile.Writer writer;
+
+ private TezTaskOutput ouputFileManager;
+
+ // TODO NEWTEZ Define Counters
+ // Number of records
+ // Time waiting for a write to complete, if that's possible.
+ // Size of key-value pairs written.
+
+ public FileBasedKVWriter(TezOutputContext outputContext) throws IOException {
+ this.conf = TezUtils.createConfFromUserPayload(outputContext
+ .getUserPayload());
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
+ outputContext.getWorkDirs());
+
+ this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
+
+ // Setup serialization
+ keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+ valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+
+ // Setup compression
+ if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, this.conf);
+ } else {
+ codec = null;
+ }
+
+ this.ouputFileManager = TezEngineUtils.instantiateTaskOutputManager(conf,
+ outputContext);
+
+ initWriter();
+ }
+
+ /**
+ * @return true if any output was generated. false otherwise
+ * @throws IOException
+ */
+ public boolean close() throws IOException {
+ this.writer.close();
+ TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
+ writer.getCompressedLength());
+ TezSpillRecord sr = new TezSpillRecord(1);
+ sr.putIndex(rec, 0);
+
+ Path indexFile = ouputFileManager
+ .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
+ sr.writeToFile(indexFile, conf);
+ return numRecords > 0;
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ this.writer.append(key, value);
+ numRecords++;
+ }
+
+ public void initWriter() throws IOException {
+ Path outputFile = ouputFileManager.getOutputFileForWrite();
+
+ // TODO NEWTEZ Consider making the buffer size configurable. Also consider
+ // setting up an in-memory buffer which is occasionally flushed to disk so
+ // that the output does not block.
+
+ // TODO NEWTEZ maybe use appropriate counter
+ this.writer = new IFile.Writer(conf, rfs, outputFile, keyClass, valClass,
+ codec, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
new file mode 100644
index 0000000..076807e
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class InputAttemptIdentifier {
+
+ private final InputIdentifier inputIdentifier;
+ private final int attemptNumber;
+ private String pathComponent;
+
+ public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
+ this(new InputIdentifier(taskIndex), attemptNumber, null);
+ }
+
+ public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+ this.inputIdentifier = inputIdentifier;
+ this.attemptNumber = attemptNumber;
+ this.pathComponent = pathComponent;
+ }
+
+ public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
+ this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
+ }
+
+ public InputIdentifier getInputIdentifier() {
+ return this.inputIdentifier;
+ }
+
+ public int getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ public String getPathComponent() {
+ return pathComponent;
+ }
+
+ // PathComponent does not need to be part of the hashCode and equals computation.
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + attemptNumber;
+ result = prime * result
+ + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
+ if (attemptNumber != other.attemptNumber)
+ return false;
+ if (inputIdentifier == null) {
+ if (other.inputIdentifier != null)
+ return false;
+ } else if (!inputIdentifier.equals(other.inputIdentifier))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+ + ", attemptNumber=" + attemptNumber + ", pathComponent="
+ + pathComponent + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
new file mode 100644
index 0000000..b694530
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common;
+
+public class InputIdentifier {
+
+ private final int srcTaskIndex;
+
+ public InputIdentifier(int srcTaskIndex) {
+ this.srcTaskIndex = srcTaskIndex;
+ }
+
+ public int getSrcTaskIndex() {
+ return this.srcTaskIndex;
+ }
+
+ @Override
+ public int hashCode() {
+ return srcTaskIndex;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ InputIdentifier other = (InputIdentifier) obj;
+ if (srcTaskIndex != other.srcTaskIndex)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index da333a2..f352e08 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -25,8 +25,13 @@ import java.lang.reflect.InvocationTargetException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.engine.api.Partitioner;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.newapi.TezOutputContext;
public class TezEngineUtils {
@@ -90,4 +95,20 @@ public class TezEngineUtils {
}
return partitioner;
}
+
+ public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
+ Class<?> clazz = conf.getClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ TezTaskOutputFiles.class);
+ try {
+ Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+ ctor.setAccessible(true);
+ TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+ return instance;
+ } catch (Exception e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate configured TezOutputFileManager: "
+ + conf.get(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ TezTaskOutputFiles.class.getName()), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
index 0440236..5c71644 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
import org.apache.tez.engine.common.security.SecureShuffleUtils;
import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
import org.apache.tez.engine.common.sort.impl.IFileInputStream;
@@ -217,7 +218,7 @@ class Fetcher extends Thread {
@VisibleForTesting
protected void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
- List<TaskAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
+ List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
@@ -231,7 +232,7 @@ class Fetcher extends Thread {
}
// List of maps to be fetched yet
- Set<TaskAttemptIdentifier> remaining = new HashSet<TaskAttemptIdentifier>(srcAttempts);
+ Set<InputAttemptIdentifier> remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
// Construct the url and connect
DataInputStream input;
@@ -290,19 +291,20 @@ class Fetcher extends Thread {
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
if (!connectSucceeded) {
- for(TaskAttemptIdentifier left: remaining) {
+ for(InputAttemptIdentifier left: remaining) {
scheduler.copyFailed(left, host, connectSucceeded);
}
} else {
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
- TaskAttemptIdentifier firstMap = srcAttempts.get(0);
+ InputAttemptIdentifier firstMap = srcAttempts.get(0);
scheduler.copyFailed(firstMap, host, connectSucceeded);
}
// Add back all the remaining maps, WITHOUT marking them as failed
- for(TaskAttemptIdentifier left: remaining) {
+ for(InputAttemptIdentifier left: remaining) {
+ // TODO Should the first one be skipped ?
scheduler.putBackKnownMapOutput(host, left);
}
@@ -314,14 +316,14 @@ class Fetcher extends Thread {
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
- TaskAttemptIdentifier[] failedTasks = null;
+ InputAttemptIdentifier[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
failedTasks = copyMapOutput(host, input, remaining);
}
if(failedTasks != null && failedTasks.length > 0) {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
- for(TaskAttemptIdentifier left: failedTasks) {
+ for(InputAttemptIdentifier left: failedTasks) {
scheduler.copyFailed(left, host, true);
}
}
@@ -334,19 +336,19 @@ class Fetcher extends Thread {
+ remaining.size() + " left.");
}
} finally {
- for (TaskAttemptIdentifier left : remaining) {
+ for (InputAttemptIdentifier left : remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
}
}
- private static TaskAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptIdentifier[0];
+ private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
- private TaskAttemptIdentifier[] copyMapOutput(MapHost host,
+ private InputAttemptIdentifier[] copyMapOutput(MapHost host,
DataInputStream input,
- Set<TaskAttemptIdentifier> remaining) {
+ Set<InputAttemptIdentifier> remaining) {
MapOutput mapOutput = null;
- TaskAttemptIdentifier srcAttemptId = null;
+ InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = -1;
long compressedLength = -1;
@@ -366,14 +368,14 @@ class Fetcher extends Thread {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
- return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
+ return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
}
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, srcAttemptId)) {
- return new TaskAttemptIdentifier[] {srcAttemptId};
+ return new InputAttemptIdentifier[] {srcAttemptId};
}
if(LOG.isDebugEnabled()) {
@@ -418,9 +420,9 @@ class Fetcher extends Thread {
srcAttemptId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
if(srcAttemptId == null) {
- return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
+ return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
} else {
- return new TaskAttemptIdentifier[] {srcAttemptId};
+ return new InputAttemptIdentifier[] {srcAttemptId};
}
}
@@ -430,7 +432,7 @@ class Fetcher extends Thread {
// Inform the shuffle-scheduler
mapOutput.abort();
metrics.failedFetch();
- return new TaskAttemptIdentifier[] {srcAttemptId};
+ return new InputAttemptIdentifier[] {srcAttemptId};
}
}
@@ -445,7 +447,7 @@ class Fetcher extends Thread {
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
- int forReduce, Set<TaskAttemptIdentifier> remaining, TaskAttemptIdentifier srcAttemptId) {
+ int forReduce, Set<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
if (compressedLength < 0 || decompressedLength < 0) {
wrongLengthErrs.increment(1);
LOG.warn(getName() + " invalid lengths in map output header: id: " +
@@ -482,13 +484,13 @@ class Fetcher extends Thread {
* @return
* @throws MalformedURLException
*/
- private URL getMapOutputURL(MapHost host, List<TaskAttemptIdentifier> srcAttempts
+ private URL getMapOutputURL(MapHost host, List<InputAttemptIdentifier> srcAttempts
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
- for (TaskAttemptIdentifier mapId : srcAttempts) {
+ for (InputAttemptIdentifier mapId : srcAttempts) {
if (!first) {
url.append(",");
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
index d10ebaa..1beed44 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.sort.impl.IFile.Reader;
@@ -34,14 +35,14 @@ import org.apache.tez.engine.common.sort.impl.IFile.Reader;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InMemoryReader extends Reader {
- private final TaskAttemptIdentifier taskAttemptId;
+ private final InputAttemptIdentifier taskAttemptId;
private final MergeManager merger;
DataInputBuffer memDataIn = new DataInputBuffer();
private int start;
private int length;
private int prevKeyPos;
- public InMemoryReader(MergeManager merger, TaskAttemptIdentifier taskAttemptId,
+ public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
byte[] data, int start, int length)
throws IOException {
super(null, null, length - start, null, null);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
index cd644de..20ec472 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
@@ -20,6 +20,10 @@ package org.apache.tez.engine.common.shuffle.impl;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+
+@Private
class MapHost {
public static enum State {
@@ -35,7 +39,7 @@ class MapHost {
private final String baseUrl;
private final String identifier;
// Tracks attempt IDs
- private List<TaskAttemptIdentifier> maps = new ArrayList<TaskAttemptIdentifier>();
+ private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
public MapHost(int partitionId, String hostName, String baseUrl) {
this.partitionId = partitionId;
@@ -68,16 +72,16 @@ class MapHost {
return baseUrl;
}
- public synchronized void addKnownMap(TaskAttemptIdentifier srcAttempt) {
+ public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
maps.add(srcAttempt);
if (state == State.IDLE) {
state = State.PENDING;
}
}
- public synchronized List<TaskAttemptIdentifier> getAndClearKnownMaps() {
- List<TaskAttemptIdentifier> currentKnownMaps = maps;
- maps = new ArrayList<TaskAttemptIdentifier>();
+ public synchronized List<InputAttemptIdentifier> getAndClearKnownMaps() {
+ List<InputAttemptIdentifier> currentKnownMaps = maps;
+ maps = new ArrayList<InputAttemptIdentifier>();
return currentKnownMaps;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
index f0b48fd..46851c7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
@@ -42,7 +43,7 @@ class MapOutput {
DISK
}
- private TaskAttemptIdentifier attemptIdentifier;
+ private InputAttemptIdentifier attemptIdentifier;
private final int id;
private final MergeManager merger;
@@ -61,7 +62,7 @@ class MapOutput {
private final boolean primaryMapOutput;
- MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, long size,
+ MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, long size,
Configuration conf, LocalDirAllocator localDirAllocator,
int fetcher, boolean primaryMapOutput,
TezTaskOutputFiles mapOutputFile)
@@ -79,7 +80,7 @@ class MapOutput {
this.localFS = FileSystem.getLocal(conf);
outputPath =
- mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getTaskIndex(), size);
+ mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
disk = localFS.create(tmpOutputPath);
@@ -87,7 +88,7 @@ class MapOutput {
this.primaryMapOutput = primaryMapOutput;
}
- MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, int size,
+ MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, int size,
boolean primaryMapOutput) {
this.id = ID.incrementAndGet();
this.attemptIdentifier = attemptIdentifier;
@@ -107,7 +108,7 @@ class MapOutput {
this.primaryMapOutput = primaryMapOutput;
}
- public MapOutput(TaskAttemptIdentifier attemptIdentifier) {
+ public MapOutput(InputAttemptIdentifier attemptIdentifier) {
this.id = ID.incrementAndGet();
this.attemptIdentifier = attemptIdentifier;
@@ -159,7 +160,7 @@ class MapOutput {
return disk;
}
- public TaskAttemptIdentifier getAttemptIdentifier() {
+ public InputAttemptIdentifier getAttemptIdentifier() {
return this.attemptIdentifier;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index a5401fa..093a293 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -47,6 +47,7 @@ import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.sort.impl.TezMerger;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
@@ -232,7 +233,7 @@ public class MergeManager {
final private MapOutput stallShuffle = new MapOutput(null);
- public synchronized MapOutput reserve(TaskAttemptIdentifier srcAttemptIdentifier,
+ public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier,
long requestedSize,
int fetcher
) throws IOException {
@@ -279,7 +280,7 @@ public class MergeManager {
* @return
*/
private synchronized MapOutput unconditionalReserve(
- TaskAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
+ InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
usedMemory += requestedSize;
return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize,
primaryMapOutput);
@@ -409,7 +410,7 @@ public class MergeManager {
return;
}
- TaskAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
+ InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
@@ -468,17 +469,16 @@ public class MergeManager {
//in the merge method)
//figure out the mapId
- TaskAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
+ InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
- Path outputPath =
- mapOutputFile.getInputFileForWrite(srcTaskIdentifier.getTaskIndex(),
- mergeOutputSize).suffix(
- Constants.MERGED_OUTPUT_PREFIX);
+ Path outputPath = mapOutputFile.getInputFileForWrite(
+ srcTaskIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
Writer writer = null;
try {
@@ -696,7 +696,7 @@ public class MergeManager {
long inMemToDiskBytes = 0;
boolean mergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
- int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getTaskIndex();
+ int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
memDiskSegments,
maxInMemReduce);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 7c41d3d..620c620 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -181,6 +181,7 @@ public class Shuffle implements ExceptionReporter {
private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
@Override
public TezRawKeyValueIterator call() throws IOException, InterruptedException {
+ // TODO NEWTEZ Limit # fetchers to number of inputs
final int numFetchers =
conf.getInt(
TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
index ec5db5b..a918ef1 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
@@ -62,6 +62,14 @@ public class ShuffleHeader implements Writable {
this.forReduce = forReduce;
}
+ public String getMapId() {
+ return this.mapId;
+ }
+
+ public int getPartition() {
+ return this.forReduce;
+ }
+
public long getUncompressedLength() {
return uncompressedLength;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
index b31d36c..98c7ab1 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.InputInformationEventPayloadProto;
import org.apache.tez.engine.newapi.Event;
@@ -95,7 +96,7 @@ public class ShuffleInputEventHandler {
int partitionId = dmEvent.getSourceIndex();
URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
- TaskAttemptIdentifier srcAttemptIdentifier = new TaskAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
// TODO NEWTEZ See if this duration hack can be removed.
@@ -107,7 +108,7 @@ public class ShuffleInputEventHandler {
}
private void processTaskFailedEvent(InputFailedEvent ifEvent) {
- TaskAttemptIdentifier taIdentifier = new TaskAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
+ InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
scheduler.obsoleteMapOutput(taIdentifier);
LOG.info("Obsoleting output of src-task: " + taIdentifier);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
index 964533d..acbd054 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
import org.apache.tez.engine.common.TezEngineUtils;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezInputContext;
@@ -63,17 +64,17 @@ class ShuffleScheduler {
private final Map<Integer, MutableInt> finishedMaps;
private final int numInputs;
private int remainingMaps;
- private Map<TaskAttemptIdentifier, MapHost> mapLocations = new HashMap<TaskAttemptIdentifier, MapHost>();
+ private Map<InputAttemptIdentifier, MapHost> mapLocations = new HashMap<InputAttemptIdentifier, MapHost>();
//TODO NEWTEZ Clean this and other maps at some point
- private ConcurrentMap<String, TaskAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, TaskAttemptIdentifier>();
+ private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
private Set<MapHost> pendingHosts = new HashSet<MapHost>();
- private Set<TaskAttemptIdentifier> obsoleteMaps = new HashSet<TaskAttemptIdentifier>();
+ private Set<InputAttemptIdentifier> obsoleteMaps = new HashSet<InputAttemptIdentifier>();
private final Random random = new Random(System.currentTimeMillis());
private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
private final Referee referee = new Referee();
- private final Map<TaskAttemptIdentifier, IntWritable> failureCounts =
- new HashMap<TaskAttemptIdentifier,IntWritable>();
+ private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
+ new HashMap<InputAttemptIdentifier,IntWritable>();
private final Map<String,IntWritable> hostFailures =
new HashMap<String,IntWritable>();
private final TezInputContext inputContext;
@@ -127,19 +128,19 @@ class ShuffleScheduler {
TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
}
- public synchronized void copySucceeded(TaskAttemptIdentifier srcAttemptIdentifier,
+ public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier,
MapHost host,
long bytes,
long milis,
MapOutput output
) throws IOException {
- String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
+ String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
failureCounts.remove(taskIdentifier);
hostFailures.remove(host.getHostName());
- if (!isFinishedTaskTrue(srcAttemptIdentifier.getTaskIndex())) {
+ if (!isFinishedTaskTrue(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
output.commit();
- if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getTaskIndex())) {
+ if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
shuffledMapsCounter.increment(1);
if (--remainingMaps == 0) {
notifyAll();
@@ -154,10 +155,11 @@ class ShuffleScheduler {
if (LOG.isDebugEnabled()) {
LOG.debug("src task: "
+ TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttemptIdentifier.getTaskIndex(),
+ inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
srcAttemptIdentifier.getAttemptNumber()) + " done");
}
}
+ // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
}
private void logProgress() {
@@ -170,7 +172,7 @@ class ShuffleScheduler {
+ mbpsFormat.format(transferRate) + " MB/s)");
}
- public synchronized void copyFailed(TaskAttemptIdentifier srcAttempt,
+ public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
MapHost host,
boolean readError) {
host.penalize();
@@ -194,7 +196,7 @@ class ShuffleScheduler {
throw new IOException(failures
+ " failures downloading "
+ TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+ inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
srcAttempt.getAttemptNumber()));
} catch (IOException ie) {
shuffle.reportException(ie);
@@ -217,20 +219,20 @@ class ShuffleScheduler {
// after every read error, if 'reportReadErrorImmediately' is true or
// after every 'maxFetchFailuresBeforeReporting' failures
private void checkAndInformJobTracker(
- int failures, TaskAttemptIdentifier srcAttempt, boolean readError) {
+ int failures, InputAttemptIdentifier srcAttempt, boolean readError) {
if ((reportReadErrorImmediately && readError)
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
LOG.info("Reporting fetch failure for "
+ TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+ inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
srcAttempt.getAttemptNumber()) + " to jobtracker.");
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
failedEvents.add(new InputReadErrorEvent("Fetch failure for "
+ TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
- srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt
- .getTaskIndex(), srcAttempt.getAttemptNumber()));
+ inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
+ srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
+ .getSrcTaskIndex(), srcAttempt.getAttemptNumber()));
inputContext.sendEvents(failedEvents);
//status.addFailedDependency(mapId);
@@ -299,7 +301,7 @@ class ShuffleScheduler {
public synchronized void addKnownMapOutput(String hostName,
int partitionId,
String hostUrl,
- TaskAttemptIdentifier srcAttempt) {
+ InputAttemptIdentifier srcAttempt) {
String identifier = MapHost.createIdentifier(hostName, partitionId);
MapHost host = mapLocations.get(identifier);
if (host == null) {
@@ -317,13 +319,13 @@ class ShuffleScheduler {
}
}
- public synchronized void obsoleteMapOutput(TaskAttemptIdentifier srcAttempt) {
+ public synchronized void obsoleteMapOutput(InputAttemptIdentifier srcAttempt) {
// The incoming srcAttempt does not contain a path component.
obsoleteMaps.add(srcAttempt);
}
public synchronized void putBackKnownMapOutput(MapHost host,
- TaskAttemptIdentifier srcAttempt) {
+ InputAttemptIdentifier srcAttempt) {
host.addKnownMap(srcAttempt);
}
@@ -349,20 +351,20 @@ class ShuffleScheduler {
return host;
}
- public TaskAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
+ public InputAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
return pathToIdentifierMap.get(pathComponent);
}
- public synchronized List<TaskAttemptIdentifier> getMapsForHost(MapHost host) {
- List<TaskAttemptIdentifier> list = host.getAndClearKnownMaps();
- Iterator<TaskAttemptIdentifier> itr = list.iterator();
- List<TaskAttemptIdentifier> result = new ArrayList<TaskAttemptIdentifier>();
+ public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
+ List<InputAttemptIdentifier> list = host.getAndClearKnownMaps();
+ Iterator<InputAttemptIdentifier> itr = list.iterator();
+ List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
int includedMaps = 0;
int totalSize = list.size();
// find the maps that we still need, up to the limit
while (itr.hasNext()) {
- TaskAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
+ InputAttemptIdentifier id = itr.next();
+ if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
result.add(id);
if (++includedMaps >= MAX_MAPS_AT_ONCE) {
break;
@@ -371,8 +373,8 @@ class ShuffleScheduler {
}
// put back the maps left after the limit
while (itr.hasNext()) {
- TaskAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
+ InputAttemptIdentifier id = itr.next();
+ if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
host.addKnownMap(id);
}
}