You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/06 23:34:07 UTC
git commit: TEZ-910. Allow ShuffledUnorderedKVInput to work for cases
other than broadcast. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 0df108154 -> 1b170a4ec
TEZ-910. Allow ShuffledUnorderedKVInput to work for cases other than
broadcast. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1b170a4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1b170a4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1b170a4e
Branch: refs/heads/master
Commit: 1b170a4ec0027cdb7dd5d5e28341187623df8658
Parents: 0df1081
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 6 14:30:58 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 6 14:30:58 2014 -0800
----------------------------------------------------------------------
.../BroadcastShuffleInputEventHandler.java | 154 -------------------
.../library/input/ShuffledUnorderedKVInput.java | 9 +-
.../library/shuffle/common/InputHost.java | 40 +++--
.../impl/ShuffleInputEventHandlerImpl.java | 151 ++++++++++++++++++
.../shuffle/common/impl/ShuffleManager.java | 11 +-
5 files changed, 194 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
deleted file mode 100644
index 68aa49f..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class BroadcastShuffleInputEventHandler implements ShuffleEventHandler {
-
- private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
-
- private final ShuffleManager shuffleManager;
- private final FetchedInputAllocator inputAllocator;
- private final CompressionCodec codec;
- private final boolean ifileReadAhead;
- private final int ifileReadAheadLength;
-
-
- public BroadcastShuffleInputEventHandler(TezInputContext inputContext,
- ShuffleManager shuffleManager,
- FetchedInputAllocator inputAllocator, CompressionCodec codec,
- boolean ifileReadAhead, int ifileReadAheadLength) {
- this.shuffleManager = shuffleManager;
- this.inputAllocator = inputAllocator;
- this.codec = codec;
- this.ifileReadAhead = ifileReadAhead;
- this.ifileReadAheadLength = ifileReadAheadLength;
- }
-
- @Override
- public void handleEvents(List<Event> events) throws IOException {
- for (Event event : events) {
- handleEvent(event);
- }
- }
-
- private void handleEvent(Event event) throws IOException {
- if (event instanceof DataMovementEvent) {
- processDataMovementEvent((DataMovementEvent)event);
- } else if (event instanceof InputFailedEvent) {
- processInputFailedEvent((InputFailedEvent)event);
- } else {
- throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
- }
- }
-
-
- private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
- Preconditions.checkArgument(dme.getSourceIndex() == 0,
- "Unexpected srcIndex: " + dme.getSourceIndex()
- + " on DataMovementEvent. Can only be 0");
- DataMovementEventPayloadProto shufflePayload;
- try {
- shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
- } catch (InvalidProtocolBufferException e) {
- throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
- }
- LOG.info("Processing DataMovementEvent with srcIndex: "
- + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
- + ", attemptNum: " + dme.getVersion() + ", payload: "
- + stringify(shufflePayload));
- if (shufflePayload.getOutputGenerated()) {
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
- dme.getTargetIndex(), dme.getVersion(),
- shufflePayload.getPathComponent());
- if (shufflePayload.hasData()) {
- DataProto dataProto = shufflePayload.getData();
- FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
- moveDataToFetchedInput(dataProto, fetchedInput);
- shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
- } else {
- shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
- }
- } else {
- shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
- }
- }
-
- private void moveDataToFetchedInput(DataProto dataProto,
- FetchedInput fetchedInput) throws IOException {
- switch (fetchedInput.getType()) {
- case DISK:
- ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, dataProto
- .getData().newInput(), dataProto.getCompressedLength(), LOG);
- break;
- case MEMORY:
- ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
- dataProto.getData().newInput(), dataProto.getRawLength(),
- dataProto.getCompressedLength(), codec, ifileReadAhead, ifileReadAheadLength, LOG);
- break;
- case WAIT:
- default:
- throw new TezUncheckedException("Unexpected type: "
- + fetchedInput.getType());
- }
- }
-
- private void processInputFailedEvent(InputFailedEvent ife) {
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
- shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
- }
-
- private String stringify(DataMovementEventPayloadProto dmProto) {
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- sb.append("outputGenerated: " + dmProto.getOutputGenerated()).append(", ");
- sb.append("host: " + dmProto.getHost()).append(", ");
- sb.append("port: " + dmProto.getPort()).append(", ");
- sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
- sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
- sb.append("hasData: " + dmProto.hasData());
- return sb.toString();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index adbeff8..c740748 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -41,9 +41,10 @@ import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleInputEventHandler;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.readers.ShuffledUnorderedKVReader;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleInputEventHandlerImpl;
import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
@@ -65,7 +66,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
private TezCounter inputRecordCounter;
private SimpleFetchedInputAllocator inputManager;
- private BroadcastShuffleInputEventHandler inputEventHandler;
+ private ShuffleEventHandler inputEventHandler;
private volatile long initialMemoryAvailable = -1;
@@ -129,7 +130,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
inputManager.setInitialMemoryAvailable(initialMemoryAvailable);
inputManager.configureAndStart();
- this.inputEventHandler = new BroadcastShuffleInputEventHandler(
+ this.inputEventHandler = new ShuffleInputEventHandlerImpl(
inputContext, shuffleManager, inputManager, codec, ifileReadAhead,
ifileReadAheadLength);
@@ -231,4 +232,4 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
index 7905e27..59da655 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -26,16 +26,24 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+/**
+ * Represents a Host with respect to the MapReduce ShuffleHandler.
+ *
+ * srcPhysicalIndex / partition is part of this since that only knows how to
+ * serve ine partition at a time.
+ */
public class InputHost {
private final String host;
private final int port;
+ private final int srcPhysicalIndex;
private final BlockingQueue<InputAttemptIdentifier> inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
- public InputHost(String hostName, int port, ApplicationId appId) {
+ public InputHost(String hostName, int port, ApplicationId appId, int srcPhysicalIndex) {
this.host = hostName;
this.port = port;
+ this.srcPhysicalIndex = srcPhysicalIndex;
}
public String getHost() {
@@ -46,6 +54,10 @@ public class InputHost {
return this.port;
}
+ public int getSrcPhysicalIndex() {
+ return this.srcPhysicalIndex;
+ }
+
public int getNumPendingInputs() {
return inputs.size();
}
@@ -67,35 +79,45 @@ public class InputHost {
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
+ result = prime * result + srcPhysicalIndex;
return result;
}
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
InputHost other = (InputHost) obj;
if (host == null) {
- if (other.host != null)
+ if (other.host != null) {
return false;
+ }
} else if (!host.equals(other.host))
return false;
- if (port != other.port)
+ if (port != other.port) {
return false;
+ }
+ if (srcPhysicalIndex != other.srcPhysicalIndex) {
+ return false;
+ }
return true;
}
public String toDetailedString() {
- return "InputHost [host=" + host + ", port=" + port + ", inputs=" + inputs
- + "]";
+ return "InputHost [host=" + host + ", port=" + port + ",srcPhysicalIndex=" + srcPhysicalIndex
+ + ", inputs=" + inputs + "]";
}
@Override
public String toString() {
- return "InputHost [host=" + host + ", port=" + port + "]";
+ return "InputHost [host=" + host + ", port=" + port + ", srcPhysicalIndex=" + srcPhysicalIndex
+ + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
new file mode 100644
index 0000000..1ae6791
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A base class for generic Event handling for Inputs which need to Shuffle data.
+ */
+public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
+
+ private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandlerImpl.class);
+
+ private final ShuffleManager shuffleManager;
+ private final FetchedInputAllocator inputAllocator;
+ private final CompressionCodec codec;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+
+
+ public ShuffleInputEventHandlerImpl(TezInputContext inputContext,
+ ShuffleManager shuffleManager,
+ FetchedInputAllocator inputAllocator, CompressionCodec codec,
+ boolean ifileReadAhead, int ifileReadAheadLength) {
+ this.shuffleManager = shuffleManager;
+ this.inputAllocator = inputAllocator;
+ this.codec = codec;
+ this.ifileReadAhead = ifileReadAhead;
+ this.ifileReadAheadLength = ifileReadAheadLength;
+ }
+
+ @Override
+ public void handleEvents(List<Event> events) throws IOException {
+ for (Event event : events) {
+ handleEvent(event);
+ }
+ }
+
+ private void handleEvent(Event event) throws IOException {
+ if (event instanceof DataMovementEvent) {
+ processDataMovementEvent((DataMovementEvent)event);
+ } else if (event instanceof InputFailedEvent) {
+ processInputFailedEvent((InputFailedEvent)event);
+ } else {
+ throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
+ }
+ }
+
+ private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
+ DataMovementEventPayloadProto shufflePayload;
+ try {
+ shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+ }
+ LOG.info("Processing DataMovementEvent with srcIndex: "
+ + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
+ + ", attemptNum: " + dme.getVersion() + ", payload: "
+ + stringify(shufflePayload));
+ if (shufflePayload.getOutputGenerated()) {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
+ dme.getTargetIndex(), dme.getVersion(),
+ shufflePayload.getPathComponent());
+ if (shufflePayload.hasData()) {
+ DataProto dataProto = shufflePayload.getData();
+ FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
+ moveDataToFetchedInput(dataProto, fetchedInput);
+ shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
+ } else {
+ shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, dme.getSourceIndex());
+ }
+ } else {
+ shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
+ }
+ }
+
+ private void moveDataToFetchedInput(DataProto dataProto,
+ FetchedInput fetchedInput) throws IOException {
+ switch (fetchedInput.getType()) {
+ case DISK:
+ ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, dataProto
+ .getData().newInput(), dataProto.getCompressedLength(), LOG);
+ break;
+ case MEMORY:
+ ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
+ dataProto.getData().newInput(), dataProto.getRawLength(),
+ dataProto.getCompressedLength(), codec, ifileReadAhead, ifileReadAheadLength, LOG);
+ break;
+ case WAIT:
+ default:
+ throw new TezUncheckedException("Unexpected type: "
+ + fetchedInput.getType());
+ }
+ }
+
+ private void processInputFailedEvent(InputFailedEvent ife) {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+ shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
+ }
+
+ private String stringify(DataMovementEventPayloadProto dmProto) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("outputGenerated: " + dmProto.getOutputGenerated()).append(", ");
+ sb.append("host: " + dmProto.getHost()).append(", ");
+ sb.append("port: " + dmProto.getPort()).append(", ");
+ sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
+ sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
+ sb.append("hasDataInEvent: " + dmProto.hasData());
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 6b91558..c04c134 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -74,6 +74,9 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+// This only knows how to deal with a single srcIndex for a given targetIndex.
+// In case the src task generates multiple outputs for the same target Index
+// (multiple src-indices), modifications will be required.
public class ShuffleManager implements FetcherCallback {
private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
@@ -339,8 +342,8 @@ public class ShuffleManager implements FetcherCallback {
}
// TODO NEWTEZ Maybe limit the number of inputs being given to a single
// fetcher, especially in the case where #hosts < #fetchers
- fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
- pendingInputsForHost);
+ fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
+ inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
LOG.info("Created Fetcher for host: " + inputHost.getHost()
+ ", with inputs: " + pendingInputsForHost);
return fetcherBuilder.build();
@@ -349,10 +352,10 @@ public class ShuffleManager implements FetcherCallback {
/////////////////// Methods for InputEventHandler
public void addKnownInput(String hostName, int port,
- InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+ InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
InputHost host = knownSrcHosts.get(hostName);
if (host == null) {
- host = new InputHost(hostName, port, inputContext.getApplicationId());
+ host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex);
InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
if (old != null) {
host = old;