You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:35 UTC
[28/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
new file mode 100644
index 0000000..ab7e5ba
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Responsible for fetching inputs served by the ShuffleHandler for a single
+ * host. Construct using {@link FetcherBuilder}
+ */
+public class Fetcher implements Callable<FetchResult> {
+
+ private static final Log LOG = LogFactory.getLog(Fetcher.class);
+
+ private static final int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+ private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
+
+ // Configurable fields.
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+ private int connectionTimeout;
+ private int readTimeout;
+
+ private final SecretKey shuffleSecret;
+ private final Configuration conf;
+
+ private final FetcherCallback fetcherCallback;
+ private final FetchedInputAllocator inputManager;
+ private final ApplicationId appId;
+
+ private static boolean sslShuffle;
+ private static SSLFactory sslFactory;
+ private static boolean sslFactoryInited;
+
+ private final int fetcherIdentifier;
+
+ // Parameters to track work.
+ private List<InputAttemptIdentifier> srcAttempts;
+ private String host;
+ private int port;
+ private int partition;
+
+ // Maps from the pathComponents (unique per srcTaskId) to the specific taskId
+ private Map<String, InputAttemptIdentifier> pathToAttemptMap;
+ private Set<InputAttemptIdentifier> remaining;
+
+ private URL url;
+ private String encHash;
+ private String msgToEncode;
+
+ private Fetcher(FetcherCallback fetcherCallback,
+ FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
+ Configuration conf) {
+ this.fetcherCallback = fetcherCallback;
+ this.inputManager = inputManager;
+ this.shuffleSecret = shuffleSecret;
+ this.appId = appId;
+ this.conf = conf;
+
+ this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
+
+ // TODO NEWTEZ Ideally, move this out from here into a static initializer block.
+ synchronized (Fetcher.class) {
+ if (!sslFactoryInited) {
+ sslFactoryInited = true;
+ sslShuffle = conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+ if (sslShuffle) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+ try {
+ sslFactory.init();
+ } catch (Exception ex) {
+ sslFactory.destroy();
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public FetchResult call() throws Exception {
+ if (srcAttempts.size() == 0) {
+ return new FetchResult(host, port, partition, srcAttempts);
+ }
+
+ for (InputAttemptIdentifier in : srcAttempts) {
+ pathToAttemptMap.put(in.getPathComponent(), in);
+ }
+
+ remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
+
+ HttpURLConnection connection;
+ try {
+ connection = connectToShuffleHandler(host, port, partition, srcAttempts);
+ } catch (IOException e) {
+ // ioErrs.increment(1);
+ // If connect did not succeed, just mark all the maps as failed,
+ // indirectly penalizing the host
+ for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
+ .hasNext();) {
+ fetcherCallback.fetchFailed(host, leftIter.next(), true);
+ leftIter.remove();
+ }
+ return new FetchResult(host, port, partition, remaining);
+ }
+
+ DataInputStream input;
+
+ try {
+ input = new DataInputStream(connection.getInputStream());
+ validateConnectionResponse(connection, url, msgToEncode, encHash);
+ } catch (IOException e) {
+ // ioErrs.increment(1);
+ // If we got a read error at this stage, it implies there was a problem
+ // with the first map, typically lost map. So, penalize only that map
+ // and add the rest
+ InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
+ fetcherCallback.fetchFailed(host, firstAttempt, false);
+ remaining.remove(firstAttempt);
+ return new FetchResult(host, port, partition, remaining);
+ }
+
+ // By this point, the connection is setup and the response has been
+ // validated.
+
+ // Loop through available map-outputs and fetch them
+ // On any error, faildTasks is not null and we exit
+ // after putting back the remaining maps to the
+ // yet_to_be_fetched list and marking the failed tasks.
+ InputAttemptIdentifier[] failedInputs = null;
+ while (!remaining.isEmpty() && failedInputs == null) {
+ failedInputs = fetchInputs(input);
+ }
+
+ if (failedInputs != null && failedInputs.length > 0) {
+ LOG.warn("copyInputs failed for tasks " + Arrays.toString(failedInputs));
+ for (InputAttemptIdentifier left : failedInputs) {
+ fetcherCallback.fetchFailed(host, left, false);
+ remaining.remove(left);
+ }
+ }
+
+ IOUtils.cleanup(LOG, input);
+
+ // Sanity check
+ if (failedInputs == null && !remaining.isEmpty()) {
+ throw new IOException("server didn't return all expected map outputs: "
+ + remaining.size() + " left.");
+ }
+
+ return new FetchResult(host, port, partition, remaining);
+
+ }
+
+ private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
+ FetchedInput fetchedInput = null;
+ InputAttemptIdentifier srcAttemptId = null;
+ long decompressedLength = -1;
+ long compressedLength = -1;
+
+ try {
+ long startTime = System.currentTimeMillis();
+ int responsePartition = -1;
+ // Read the shuffle header
+ try {
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(input);
+ String pathComponent = header.getMapId();
+
+ srcAttemptId = pathToAttemptMap.get(pathComponent);
+ compressedLength = header.getCompressedLength();
+ decompressedLength = header.getUncompressedLength();
+ responsePartition = header.getPartition();
+ } catch (IllegalArgumentException e) {
+ // badIdErrs.increment(1);
+ LOG.warn("Invalid src id ", e);
+ // Don't know which one was bad, so consider all of them as bad
+ return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+ }
+
+ // Do some basic sanity verification
+ if (!verifySanity(compressedLength, decompressedLength,
+ responsePartition, srcAttemptId)) {
+ return new InputAttemptIdentifier[] { srcAttemptId };
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
+ + ", decomp len: " + decompressedLength);
+ }
+
+ // Get the location for the map output - either in-memory or on-disk
+ fetchedInput = inputManager.allocate(decompressedLength, srcAttemptId);
+
+ // TODO NEWTEZ No concept of WAIT at the moment.
+ // // Check if we can shuffle *now* ...
+ // if (fetchedInput.getType() == FetchedInput.WAIT) {
+ // LOG.info("fetcher#" + id +
+ // " - MergerManager returned Status.WAIT ...");
+ // //Not an error but wait to process data.
+ // return EMPTY_ATTEMPT_ID_ARRAY;
+ // }
+
+ // Go!
+ LOG.info("fetcher" + " about to shuffle output of srcAttempt "
+ + fetchedInput.getInputAttemptIdentifier() + " decomp: "
+ + decompressedLength + " len: " + compressedLength + " to "
+ + fetchedInput.getType());
+
+ if (fetchedInput.getType() == Type.MEMORY) {
+ shuffleToMemory((MemoryFetchedInput) fetchedInput, input,
+ (int) decompressedLength, (int) compressedLength);
+ } else {
+ shuffleToDisk((DiskFetchedInput) fetchedInput, input, compressedLength);
+ }
+
+ // Inform the shuffle scheduler
+ long endTime = System.currentTimeMillis();
+ fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
+ compressedLength, (endTime - startTime));
+
+ // Note successful shuffle
+ remaining.remove(srcAttemptId);
+ // metrics.successFetch();
+ return null;
+ } catch (IOException ioe) {
+ // ioErrs.increment(1);
+ if (srcAttemptId == null || fetchedInput == null) {
+ LOG.info("fetcher" + " failed to read map header" + srcAttemptId
+ + " decomp: " + decompressedLength + ", " + compressedLength, ioe);
+ if (srcAttemptId == null) {
+ return remaining
+ .toArray(new InputAttemptIdentifier[remaining.size()]);
+ } else {
+ return new InputAttemptIdentifier[] { srcAttemptId };
+ }
+ }
+ LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host,
+ ioe);
+
+ // Inform the shuffle-scheduler
+ try {
+ fetchedInput.abort();
+ } catch (IOException e) {
+ LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
+ }
+ // metrics.failedFetch();
+ return new InputAttemptIdentifier[] { srcAttemptId };
+ }
+ }
+
+ @SuppressWarnings("resource")
+ private void shuffleToMemory(MemoryFetchedInput fetchedInput,
+ InputStream input, int decompressedLength, int compressedLength)
+ throws IOException {
+ IFileInputStream checksumIn = new IFileInputStream(input, compressedLength,
+ conf);
+
+ input = checksumIn;
+
+ // Are map-outputs compressed?
+ if (codec != null) {
+ decompressor.reset();
+ input = codec.createInputStream(input, decompressor);
+ }
+ // Copy map-output into an in-memory buffer
+ byte[] shuffleData = fetchedInput.getBytes();
+
+ try {
+ IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
+ // metrics.inputBytes(shuffleData.length);
+ LOG.info("Read " + shuffleData.length + " bytes from input for "
+ + fetchedInput.getInputAttemptIdentifier());
+ } catch (IOException ioe) {
+ // Close the streams
+ IOUtils.cleanup(LOG, input);
+ // Re-throw
+ throw ioe;
+ }
+ }
+
+ private void shuffleToDisk(DiskFetchedInput fetchedInput, InputStream input,
+ long compressedLength) throws IOException {
+ // Copy data to local-disk
+ OutputStream output = fetchedInput.getOutputStream();
+ long bytesLeft = compressedLength;
+ try {
+ final int BYTES_TO_READ = 64 * 1024;
+ byte[] buf = new byte[BYTES_TO_READ];
+ while (bytesLeft > 0) {
+ int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ if (n < 0) {
+ throw new IOException("read past end of stream reading "
+ + fetchedInput.getInputAttemptIdentifier());
+ }
+ output.write(buf, 0, n);
+ bytesLeft -= n;
+ // metrics.inputBytes(n);
+ }
+
+ LOG.info("Read " + (compressedLength - bytesLeft)
+ + " bytes from input for " + fetchedInput.getInputAttemptIdentifier());
+
+ output.close();
+ } catch (IOException ioe) {
+ // Close the streams
+ IOUtils.cleanup(LOG, input, output);
+
+ // Re-throw
+ throw ioe;
+ }
+
+ // Sanity check
+ if (bytesLeft != 0) {
+ throw new IOException("Incomplete input received for "
+ + fetchedInput.getInputAttemptIdentifier() + " from " + host + " ("
+ + bytesLeft + " bytes missing of " + compressedLength + ")");
+ }
+ }
+
+ /**
+ * Do some basic verification on the input received -- Being defensive
+ *
+ * @param compressedLength
+ * @param decompressedLength
+ * @param fetchPartition
+ * @param remaining
+ * @param mapId
+ * @return true/false, based on if the verification succeeded or not
+ */
+ private boolean verifySanity(long compressedLength, long decompressedLength,
+ int fetchPartition, InputAttemptIdentifier srcAttemptId) {
+ if (compressedLength < 0 || decompressedLength < 0) {
+ // wrongLengthErrs.increment(1);
+ LOG.warn(" invalid lengths in input header: id: " + srcAttemptId
+ + " len: " + compressedLength + ", decomp len: " + decompressedLength);
+ return false;
+ }
+
+ if (fetchPartition != this.partition) {
+ // wrongReduceErrs.increment(1);
+ LOG.warn(" data for the wrong reduce map: " + srcAttemptId + " len: "
+ + compressedLength + " decomp len: " + decompressedLength
+ + " for reduce " + fetchPartition);
+ return false;
+ }
+
+ // Sanity check
+ if (!remaining.contains(srcAttemptId)) {
+ // wrongMapErrs.increment(1);
+ LOG.warn("Invalid input. Received output for " + srcAttemptId);
+ return false;
+ }
+ return true;
+ }
+
+ private HttpURLConnection connectToShuffleHandler(String host, int port,
+ int partition, List<InputAttemptIdentifier> inputs) throws IOException {
+ try {
+ this.url = constructInputURL(host, port, partition, inputs);
+ HttpURLConnection connection = openConnection(url);
+
+ // generate hash of the url
+ this.msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+ this.encHash = SecureShuffleUtils.hashFromString(msgToEncode,
+ shuffleSecret);
+
+ // put url hash into http header
+ connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
+ encHash);
+ // set the read timeout
+ connection.setReadTimeout(readTimeout);
+ // put shuffle version into http header
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+
+ connect(connection, connectionTimeout);
+ return connection;
+ } catch (IOException e) {
+ LOG.warn("Failed to connect to " + host + " with " + srcAttempts.size()
+ + " inputs", e);
+ throw e;
+ }
+ }
+
+ private void validateConnectionResponse(HttpURLConnection connection,
+ URL url, String msgToEncode, String encHash) throws IOException {
+ int rc = connection.getResponseCode();
+ if (rc != HttpURLConnection.HTTP_OK) {
+ throw new IOException("Got invalid response code " + rc + " from " + url
+ + ": " + connection.getResponseMessage());
+ }
+
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
+ .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
+ .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ throw new IOException("Incompatible shuffle response version");
+ }
+
+ // get the replyHash which is HMac of the encHash we sent to the server
+ String replyHash = connection
+ .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+ if (replyHash == null) {
+ throw new IOException("security validation of TT Map output failed");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
+ + replyHash);
+ }
+ // verify that replyHash is HMac of encHash
+ SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecret);
+ LOG.info("for url=" + msgToEncode + " sent hash and receievd reply");
+ }
+
+ protected HttpURLConnection openConnection(URL url) throws IOException {
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ if (sslShuffle) {
+ HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
+ try {
+ httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
+ } catch (GeneralSecurityException ex) {
+ throw new IOException(ex);
+ }
+ httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
+ }
+ return conn;
+ }
+
+ /**
+ * The connection establishment is attempted multiple times and is given up
+ * only on the last failure. Instead of connecting with a timeout of X, we try
+ * connecting with a timeout of x < X but multiple times.
+ */
+ private void connect(URLConnection connection, int connectionTimeout)
+ throws IOException {
+ int unit = 0;
+ if (connectionTimeout < 0) {
+ throw new IOException("Invalid timeout " + "[timeout = "
+ + connectionTimeout + " ms]");
+ } else if (connectionTimeout > 0) {
+ unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+ }
+ // set the connect timeout to the unit-connect-timeout
+ connection.setConnectTimeout(unit);
+ while (true) {
+ try {
+ connection.connect();
+ break;
+ } catch (IOException ioe) {
+ // update the total remaining connect-timeout
+ connectionTimeout -= unit;
+
+ // throw an exception if we have waited for timeout amount of time
+ // note that the updated value if timeout is used here
+ if (connectionTimeout == 0) {
+ throw ioe;
+ }
+
+ // reset the connect timeout for the last try
+ if (connectionTimeout < unit) {
+ unit = connectionTimeout;
+ // reset the connect time out for the final connect
+ connection.setConnectTimeout(unit);
+ }
+ }
+ }
+ }
+
+ private URL constructInputURL(String host, int port, int partition,
+ List<InputAttemptIdentifier> inputs) throws MalformedURLException {
+ StringBuilder url = ShuffleUtils.constructBaseURIForShuffleHandler(host,
+ port, partition, appId);
+ boolean first = true;
+ for (InputAttemptIdentifier input : inputs) {
+ if (first) {
+ first = false;
+ url.append(input.getPathComponent());
+ } else {
+ url.append(",").append(input.getPathComponent());
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("InputFetch URL for: " + host + " : " + url.toString());
+ }
+ return new URL(url.toString());
+ }
+
+ /**
+ * Builder for the construction of Fetchers
+ */
+ public static class FetcherBuilder {
+ private Fetcher fetcher;
+ private boolean workAssigned = false;
+
+ public FetcherBuilder(FetcherCallback fetcherCallback,
+ FetchedInputAllocator inputManager, ApplicationId appId,
+ SecretKey shuffleSecret, Configuration conf) {
+ this.fetcher = new Fetcher(fetcherCallback, inputManager, appId,
+ shuffleSecret, conf);
+ }
+
+ public FetcherBuilder setCompressionParameters(CompressionCodec codec,
+ Decompressor decompressor) {
+ fetcher.codec = codec;
+ fetcher.decompressor = decompressor;
+ return this;
+ }
+
+ public FetcherBuilder setConnectionParameters(int connectionTimeout,
+ int readTimeout) {
+ fetcher.connectionTimeout = connectionTimeout;
+ fetcher.readTimeout = readTimeout;
+ return this;
+ }
+
+ public FetcherBuilder assignWork(String host, int port, int partition,
+ List<InputAttemptIdentifier> inputs) {
+ fetcher.host = host;
+ fetcher.port = port;
+ fetcher.partition = partition;
+ fetcher.srcAttempts = inputs;
+ workAssigned = true;
+ return this;
+ }
+
+ public Fetcher build() {
+ Preconditions.checkState(workAssigned == true,
+ "Cannot build a fetcher withot assigning work to it");
+ return fetcher;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return fetcherIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Fetcher other = (Fetcher) obj;
+ if (fetcherIdentifier != other.fetcherIdentifier)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetcherCallback.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetcherCallback.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetcherCallback.java
new file mode 100644
index 0000000..18504b1
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetcherCallback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public interface FetcherCallback {
+
+ public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes, long copyDuration) throws IOException;
+
+ public void fetchFailed(String host, InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
new file mode 100644
index 0000000..66605dd
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public class InputHost {
+
+ private final String host;
+ private final int port;
+
+ private final List<InputAttemptIdentifier> inputs = new LinkedList<InputAttemptIdentifier>();
+
+ public InputHost(String hostName, int port, ApplicationId appId) {
+ this.host = hostName;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return this.host;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public synchronized int getNumPendingInputs() {
+ return inputs.size();
+ }
+
+ public synchronized void addKnownInput(InputAttemptIdentifier srcAttempt) {
+ inputs.add(srcAttempt);
+ }
+
+ public synchronized List<InputAttemptIdentifier> clearAndGetPendingInputs() {
+ List<InputAttemptIdentifier> inputsCopy = new ArrayList<InputAttemptIdentifier>(
+ inputs);
+ inputs.clear();
+ return inputsCopy;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ InputHost other = (InputHost) obj;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
new file mode 100644
index 0000000..f56877f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+import com.google.common.base.Preconditions;
+
+public class MemoryFetchedInput extends FetchedInput {
+
+ private BoundedByteArrayOutputStream byteStream;
+
+ public MemoryFetchedInput(long size,
+ InputAttemptIdentifier inputAttemptIdentifier,
+ FetchedInputCallback callbackHandler) {
+ super(Type.MEMORY, size, inputAttemptIdentifier, callbackHandler);
+ this.byteStream = new BoundedByteArrayOutputStream((int) size);
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ return byteStream;
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return new ByteArrayInputStream(byteStream.getBuffer());
+ }
+
+ public byte[] getBytes() {
+ return byteStream.getBuffer();
+ }
+
+ @Override
+ public void commit() {
+ if (state == State.PENDING) {
+ state = State.COMMITTED;
+ notifyFetchComplete();
+ }
+ }
+
+ @Override
+ public void abort() {
+ if (state == State.PENDING) {
+ state = State.ABORTED;
+ notifyFetchFailure();
+ }
+ }
+
+ @Override
+ public void free() {
+ Preconditions.checkState(
+ state == State.COMMITTED || state == State.ABORTED,
+ "FetchedInput can only be freed after it is committed or aborted");
+ if (state == State.COMMITTED) {
+ state = State.FREED;
+ this.byteStream = null;
+ notifyFreedResource();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "MemoryFetchedInput [inputAttemptIdentifier="
+ + inputAttemptIdentifier + ", size=" + size + ", type=" + type
+ + ", id=" + id + ", state=" + state + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
new file mode 100644
index 0000000..7479e7e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
+
+public class ShuffleUtils {
+
+ public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce.shuffle";
+
+ public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
+ throws IOException {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(meta);
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+ jt.readFields(in);
+ SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+ return sk;
+ }
+
+ public static ByteBuffer convertJobTokenToBytes(
+ Token<JobTokenIdentifier> jobToken) throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ jobToken.write(dob);
+ ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ return bb;
+ }
+
+ public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
+ throws IOException {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ try {
+ in.reset(meta);
+ int port = in.readInt();
+ return port;
+ } finally {
+ in.close();
+ }
+ }
+
+ // TODO NEWTEZ handle ssl shuffle
+ public static StringBuilder constructBaseURIForShuffleHandler(String host, int port, int partition, ApplicationId appId) {
+ StringBuilder sb = new StringBuilder("http://");
+ sb.append(host);
+ sb.append(":");
+ sb.append(String.valueOf(port));
+ sb.append("/");
+ sb.append("mapOutput?job=");
+ sb.append(appId.toString().replace("application", "job"));
+ sb.append("&reduce=");
+ sb.append(String.valueOf(partition));
+ sb.append("&map=");
+ return sb;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
new file mode 100644
index 0000000..1fb000f
--- /dev/null
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.runtime.library.shuffle.impl";
+option java_outer_classname = "ShuffleUserPayloads";
+option java_generate_equals_and_hash = true;
+
+message DataMovementEventPayloadProto {
+ optional bool output_generated = 1;
+ optional string host = 2;
+ optional int32 port = 3;
+ optional string path_component = 4;
+ optional int32 run_duration = 5;
+}
+
+message InputInformationEventPayloadProto {
+ optional int32 partition_range = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 6496b55..a6d250f 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -95,8 +95,6 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -104,6 +102,8 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import com.google.common.annotations.VisibleForTesting;