You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [7/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Token<JobTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
+ && service.equals(token.getService())) {
+ return (Token<JobTokenIdentifier>) token;
+ }
+ }
+ return null;
+ }
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.security;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+
+import javax.crypto.SecretKey;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ *
+ * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SecureShuffleUtils {
+ public static final String HTTP_HEADER_URL_HASH = "UrlHash";
+ public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+
+ /**
+ * Base64 encoded hash of msg
+ * @param msg
+ */
+ public static String generateHash(byte[] msg, SecretKey key) {
+ return new String(Base64.encodeBase64(generateByteHash(msg, key)));
+ }
+
+ /**
+ * calculate hash of msg
+ * @param msg
+ * @return
+ */
+ private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+ return JobTokenSecretManager.computeHash(msg, key);
+ }
+
+ /**
+ * verify that hash equals to HMacHash(msg)
+ * @param newHash
+ * @return true if is the same
+ */
+ private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+ byte[] msg_hash = generateByteHash(msg, key);
+ return WritableComparator.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
+ }
+
+ /**
+ * Aux util to calculate hash of a String
+ * @param enc_str
+ * @param key
+ * @return Base64 encodedHash
+ * @throws IOException
+ */
+ public static String hashFromString(String enc_str, SecretKey key)
+ throws IOException {
+ return generateHash(enc_str.getBytes(), key);
+ }
+
+ /**
+ * verify that base64Hash is same as HMacHash(msg)
+ * @param base64Hash (Base64 encoded hash)
+ * @param msg
+ * @throws IOException if not the same
+ */
+ public static void verifyReply(String base64Hash, String msg, SecretKey key)
+ throws IOException {
+ byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+
+ boolean res = verifyHash(hash, msg.getBytes(), key);
+
+ if(res != true) {
+ throw new IOException("Verification of the hashReply failed");
+ }
+ }
+
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param url
+ * @return string for encoding
+ */
+ public static String buildMsgFrom(URL url) {
+ return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
+ }
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param request
+ * @return string for encoding
+ */
+ public static String buildMsgFrom(HttpServletRequest request ) {
+ return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
+ request.getLocalPort());
+ }
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param uri_path
+ * @param uri_query
+ * @return string for encoding
+ */
+ private static String buildMsgFrom(String uri_path, String uri_query, int port) {
+ return String.valueOf(port) + uri_path + "?" + uri_query;
+ }
+
+
+ /**
+ * byte array to Hex String
+ * @param ba
+ * @return string with HEX value of the key
+ */
+ public static String toHex(byte[] ba) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ for(byte b: ba) {
+ ps.printf("%x", b);
+ }
+ return baos.toString();
+ }
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.api.Master;
+import org.apache.tez.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskDependencyCompletionEventsUpdate;
+
+class EventFetcher extends Thread {
+ private static final long SLEEP_TIME = 1000;
+ private static final int MAX_RETRIES = 10;
+ private static final int RETRY_PERIOD = 5000;
+ private static final Log LOG = LogFactory.getLog(EventFetcher.class);
+
+ private final TezTaskAttemptID reduce;
+ private final Master umbilical;
+ private final ShuffleScheduler scheduler;
+ private int fromEventIdx = 0;
+ private int maxEventsToFetch;
+ private ExceptionReporter exceptionReporter = null;
+
+ private int maxMapRuntime = 0;
+
+ private volatile boolean stopped = false;
+
+ public EventFetcher(TezTaskAttemptID reduce,
+ Master umbilical,
+ ShuffleScheduler scheduler,
+ ExceptionReporter reporter,
+ int maxEventsToFetch) {
+ setName("EventFetcher for fetching Map Completion Events");
+ setDaemon(true);
+ this.reduce = reduce;
+ this.umbilical = umbilical;
+ this.scheduler = scheduler;
+ exceptionReporter = reporter;
+ this.maxEventsToFetch = maxEventsToFetch;
+ }
+
+ @Override
+ public void run() {
+ int failures = 0;
+ LOG.info(reduce + " Thread started: " + getName());
+
+ try {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ int numNewMaps = getMapCompletionEvents();
+ failures = 0;
+ if (numNewMaps > 0) {
+ LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
+ }
+ LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
+ if (!Thread.currentThread().isInterrupted()) {
+ Thread.sleep(SLEEP_TIME);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("EventFetcher is interrupted.. Returning");
+ return;
+ } catch (IOException ie) {
+ LOG.info("Exception in getting events", ie);
+ // check to see whether to abort
+ if (++failures >= MAX_RETRIES) {
+ throw new IOException("too many failures downloading events", ie);
+ }
+ // sleep for a bit
+ if (!Thread.currentThread().isInterrupted()) {
+ Thread.sleep(RETRY_PERIOD);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable t) {
+ exceptionReporter.reportException(t);
+ return;
+ }
+ }
+
+ public void shutDown() {
+ this.stopped = true;
+ interrupt();
+ try {
+ join(5000);
+ } catch(InterruptedException ie) {
+ LOG.warn("Got interrupted while joining " + getName(), ie);
+ }
+ }
+
+ /**
+ * Queries the {@link TaskTracker} for a set of map-completion events
+ * from a given event ID.
+ * @throws IOException
+ */
+ protected int getMapCompletionEvents() throws IOException {
+
+ int numNewMaps = 0;
+ TezDependentTaskCompletionEvent events[] = null;
+
+ do {
+ TezTaskDependencyCompletionEventsUpdate update =
+ umbilical.getDependentTasksCompletionEvents(
+ reduce.getJobID(),
+ fromEventIdx,
+ maxEventsToFetch,
+ reduce);
+ events = update.getDependentTaskCompletionEvents();
+ LOG.debug("Got " + events.length + " map completion events from " +
+ fromEventIdx);
+
+ // Check if the reset is required.
+ // Since there is no ordering of the task completion events at the
+ // reducer, the only option to sync with the new jobtracker is to reset
+ // the events index
+ if (update.shouldReset()) {
+ fromEventIdx = 0;
+ scheduler.resetKnownMaps();
+ }
+
+ // Update the last seen event ID
+ fromEventIdx += events.length;
+
+ // Process the TaskCompletionEvents:
+ // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+ // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
+ // fetching from those maps.
+ // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+ // outputs at all.
+ for (TezDependentTaskCompletionEvent event : events) {
+ switch (event.getStatus()) {
+ case SUCCEEDED:
+ URI u = getBaseURI(event.getTaskTrackerHttp());
+ scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
+ u.toString(),
+ event.getTaskAttemptID());
+ numNewMaps ++;
+ int duration = event.getTaskRunTime();
+ if (duration > maxMapRuntime) {
+ maxMapRuntime = duration;
+ scheduler.informMaxMapRunTime(maxMapRuntime);
+ }
+ break;
+ case FAILED:
+ case KILLED:
+ case OBSOLETE:
+ scheduler.obsoleteMapOutput(event.getTaskAttemptID());
+ LOG.info("Ignoring obsolete output of " + event.getStatus() +
+ " map-task: '" + event.getTaskAttemptID() + "'");
+ break;
+ case TIPFAILED:
+ scheduler.tipFailed(event.getTaskAttemptID().getTaskID());
+ LOG.info("Ignoring output of failed map TIP: '" +
+ event.getTaskAttemptID() + "'");
+ break;
+ }
+ }
+ } while (events.length == maxEventsToFetch);
+
+ return numNewMaps;
+ }
+
+ private URI getBaseURI(String url) {
+ StringBuffer baseUrl = new StringBuffer(url);
+ if (!url.endsWith("/")) {
+ baseUrl.append("/");
+ }
+ baseUrl.append("mapOutput?job=");
+ baseUrl.append(reduce.getJobID());
+ baseUrl.append("&reduce=");
+ baseUrl.append(reduce.getTaskID().getId());
+ baseUrl.append("&map=");
+ URI u = URI.create(baseUrl.toString());
+ return u;
+ }
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+/**
+ * An interface for reporting exceptions to other threads
+ */
+interface ExceptionReporter {
+ void reportException(Throwable t);
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.IDUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.security.SecureShuffleUtils;
+import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
+import org.apache.tez.engine.common.sort.impl.IFileInputStream;
+import org.apache.tez.records.TezTaskAttemptID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class Fetcher extends Thread {
+
+ private static final Log LOG = LogFactory.getLog(Fetcher.class);
+
+ /** Basic/unit connection timeout (in milliseconds) */
+ private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+
+ private final Progressable reporter;
+ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+ CONNECTION, WRONG_REDUCE}
+
+ private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+ private final TezCounter connectionErrs;
+ private final TezCounter ioErrs;
+ private final TezCounter wrongLengthErrs;
+ private final TezCounter badIdErrs;
+ private final TezCounter wrongMapErrs;
+ private final TezCounter wrongReduceErrs;
+ private final MergeManager merger;
+ private final ShuffleScheduler scheduler;
+ private final ShuffleClientMetrics metrics;
+ private final ExceptionReporter exceptionReporter;
+ private final int id;
+ private static int nextId = 0;
+ private final int reduce;
+
+ private final int connectionTimeout;
+ private final int readTimeout;
+
+ // Decompression of map-outputs
+ private final CompressionCodec codec;
+ private final Decompressor decompressor;
+ private final SecretKey jobTokenSecret;
+
+ private volatile boolean stopped = false;
+
+ private Configuration job;
+
+ private static boolean sslShuffle;
+ private static SSLFactory sslFactory;
+
+ public Fetcher(Configuration job, TezTaskAttemptID reduceId,
+ ShuffleScheduler scheduler, MergeManager merger,
+ TezTaskReporter reporter, ShuffleClientMetrics metrics,
+ ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
+ this.job = job;
+ this.reporter = reporter;
+ this.scheduler = scheduler;
+ this.merger = merger;
+ this.metrics = metrics;
+ this.exceptionReporter = exceptionReporter;
+ this.id = ++nextId;
+ this.reduce = reduceId.getTaskID().getId();
+ this.jobTokenSecret = jobTokenSecret;
+ ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.IO_ERROR.toString());
+ wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.WRONG_LENGTH.toString());
+ badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.BAD_ID.toString());
+ wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.WRONG_MAP.toString());
+ connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.CONNECTION.toString());
+ wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.WRONG_REDUCE.toString());
+
+ if (ConfigUtils.getCompressMapOutput(job)) {
+ Class<? extends CompressionCodec> codecClass =
+ ConfigUtils.getMapOutputCompressorClass(job, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, job);
+ decompressor = CodecPool.getDecompressor(codec);
+ } else {
+ codec = null;
+ decompressor = null;
+ }
+
+ this.connectionTimeout =
+ job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
+ this.readTimeout =
+ job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+
+ setName("fetcher#" + id);
+ setDaemon(true);
+
+ synchronized (Fetcher.class) {
+ sslShuffle = job.getBoolean(TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+ if (sslShuffle && sslFactory == null) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
+ try {
+ sslFactory.init();
+ } catch (Exception ex) {
+ sslFactory.destroy();
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+ public void run() {
+ try {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ MapHost host = null;
+ try {
+ // If merge is on, block
+ merger.waitForInMemoryMerge();
+
+ // Get a host to shuffle from
+ host = scheduler.getHost();
+ metrics.threadBusy();
+
+ // Shuffle
+ copyFromHost(host);
+ } finally {
+ if (host != null) {
+ scheduler.freeHost(host);
+ metrics.threadFree();
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ return;
+ } catch (Throwable t) {
+ exceptionReporter.reportException(t);
+ }
+ }
+
+ public void shutDown() throws InterruptedException {
+ this.stopped = true;
+ interrupt();
+ try {
+ join(5000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Got interrupt while joining " + getName(), ie);
+ }
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
+ }
+
+ @VisibleForTesting
+ protected HttpURLConnection openConnection(URL url) throws IOException {
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ if (sslShuffle) {
+ HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
+ try {
+ httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
+ } catch (GeneralSecurityException ex) {
+ throw new IOException(ex);
+ }
+ httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
+ }
+ return conn;
+ }
+
+ /**
+ * The crux of the matter...
+ *
+ * @param host {@link MapHost} from which we need to
+ * shuffle available map-outputs.
+ */
+ @VisibleForTesting
+ protected void copyFromHost(MapHost host) throws IOException {
+ // Get completed maps on 'host'
+ List<TezTaskAttemptID> maps = scheduler.getMapsForHost(host);
+
+ // Sanity check to catch hosts with only 'OBSOLETE' maps,
+ // especially at the tail of large jobs
+ if (maps.size() == 0) {
+ return;
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ + maps);
+ }
+
+ // List of maps to be fetched yet
+ Set<TezTaskAttemptID> remaining = new HashSet<TezTaskAttemptID>(maps);
+
+ // Construct the url and connect
+ DataInputStream input;
+ boolean connectSucceeded = false;
+
+ try {
+ URL url = getMapOutputURL(host, maps);
+ HttpURLConnection connection = openConnection(url);
+
+ // generate hash of the url
+ String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+ String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
+
+ // put url hash into http header
+ connection.addRequestProperty(
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ // set the read timeout
+ connection.setReadTimeout(readTimeout);
+ connect(connection, connectionTimeout);
+ connectSucceeded = true;
+ input = new DataInputStream(connection.getInputStream());
+
+ // Validate response code
+ int rc = connection.getResponseCode();
+ if (rc != HttpURLConnection.HTTP_OK) {
+ throw new IOException(
+ "Got invalid response code " + rc + " from " + url +
+ ": " + connection.getResponseMessage());
+ }
+
+ // get the replyHash which is HMac of the encHash we sent to the server
+ String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+ if(replyHash==null) {
+ throw new IOException("security validation of TT Map output failed");
+ }
+ LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
+ // verify that replyHash is HMac of encHash
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
+ LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
+ } catch (IOException ie) {
+ ioErrs.increment(1);
+ LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
+ " map outputs", ie);
+
+ // If connect did not succeed, just mark all the maps as failed,
+ // indirectly penalizing the host
+ if (!connectSucceeded) {
+ for(TezTaskAttemptID left: remaining) {
+ scheduler.copyFailed(left, host, connectSucceeded);
+ }
+ } else {
+ // If we got a read error at this stage, it implies there was a problem
+ // with the first map, typically lost map. So, penalize only that map
+ // and add the rest
+ TezTaskAttemptID firstMap = maps.get(0);
+ scheduler.copyFailed(firstMap, host, connectSucceeded);
+ }
+
+ // Add back all the remaining maps, WITHOUT marking them as failed
+ for(TezTaskAttemptID left: remaining) {
+ scheduler.putBackKnownMapOutput(host, left);
+ }
+
+ return;
+ }
+
+ try {
+ // Loop through available map-outputs and fetch them
+ // On any error, faildTasks is not null and we exit
+ // after putting back the remaining maps to the
+ // yet_to_be_fetched list and marking the failed tasks.
+ TezTaskAttemptID[] failedTasks = null;
+ while (!remaining.isEmpty() && failedTasks == null) {
+ failedTasks = copyMapOutput(host, input, remaining);
+ }
+
+ if(failedTasks != null && failedTasks.length > 0) {
+ LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+ for(TezTaskAttemptID left: failedTasks) {
+ scheduler.copyFailed(left, host, true);
+ }
+ }
+
+ IOUtils.cleanup(LOG, input);
+
+ // Sanity check
+ if (failedTasks == null && !remaining.isEmpty()) {
+ throw new IOException("server didn't return all expected map outputs: "
+ + remaining.size() + " left.");
+ }
+ } finally {
+ for (TezTaskAttemptID left : remaining) {
+ scheduler.putBackKnownMapOutput(host, left);
+ }
+ }
+ }
+
+ private static TezTaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TezTaskAttemptID[0];
+
+ private TezTaskAttemptID[] copyMapOutput(MapHost host,
+ DataInputStream input,
+ Set<TezTaskAttemptID> remaining) {
+ MapOutput mapOutput = null;
+ TezTaskAttemptID mapId = null;
+ long decompressedLength = -1;
+ long compressedLength = -1;
+
+ try {
+ long startTime = System.currentTimeMillis();
+ int forReduce = -1;
+ //Read the shuffle header
+ try {
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(input);
+ mapId = IDUtils.toTaskAttemptId(header.mapId);
+ compressedLength = header.compressedLength;
+ decompressedLength = header.uncompressedLength;
+ forReduce = header.forReduce;
+ } catch (IllegalArgumentException e) {
+ badIdErrs.increment(1);
+ LOG.warn("Invalid map id ", e);
+ //Don't know which one was bad, so consider all of them as bad
+ return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+ }
+
+
+ // Do some basic sanity verification
+ if (!verifySanity(compressedLength, decompressedLength, forReduce,
+ remaining, mapId)) {
+ return new TezTaskAttemptID[] {mapId};
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("header: " + mapId + ", len: " + compressedLength +
+ ", decomp len: " + decompressedLength);
+ }
+
+ // Get the location for the map output - either in-memory or on-disk
+ mapOutput = merger.reserve(mapId, decompressedLength, id);
+
+ // Check if we can shuffle *now* ...
+ if (mapOutput.getType() == Type.WAIT) {
+ LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
+ //Not an error but wait to process data.
+ return EMPTY_ATTEMPT_ID_ARRAY;
+ }
+
+ // Go!
+ LOG.info("fetcher#" + id + " about to shuffle output of map " +
+ mapOutput.getMapId() + " decomp: " +
+ decompressedLength + " len: " + compressedLength + " to " +
+ mapOutput.getType());
+ if (mapOutput.getType() == Type.MEMORY) {
+ shuffleToMemory(host, mapOutput, input,
+ (int) decompressedLength, (int) compressedLength);
+ } else {
+ shuffleToDisk(host, mapOutput, input, compressedLength);
+ }
+
+ // Inform the shuffle scheduler
+ long endTime = System.currentTimeMillis();
+ scheduler.copySucceeded(mapId, host, compressedLength,
+ endTime - startTime, mapOutput);
+ // Note successful shuffle
+ remaining.remove(mapId);
+ metrics.successFetch();
+ return null;
+ } catch (IOException ioe) {
+ ioErrs.increment(1);
+ if (mapId == null || mapOutput == null) {
+ LOG.info("fetcher#" + id + " failed to read map header" +
+ mapId + " decomp: " +
+ decompressedLength + ", " + compressedLength, ioe);
+ if(mapId == null) {
+ return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+ } else {
+ return new TezTaskAttemptID[] {mapId};
+ }
+ }
+
+ LOG.warn("Failed to shuffle output of " + mapId +
+ " from " + host.getHostName(), ioe);
+
+ // Inform the shuffle-scheduler
+ mapOutput.abort();
+ metrics.failedFetch();
+ return new TezTaskAttemptID[] {mapId};
+ }
+
+ }
+
+ /**
+ * Do some basic verification on the input received -- Being defensive
+ * @param compressedLength
+ * @param decompressedLength
+ * @param forReduce
+ * @param remaining
+ * @param mapId
+ * @return true/false, based on if the verification succeeded or not
+ */
+ private boolean verifySanity(long compressedLength, long decompressedLength,
+ int forReduce, Set<TezTaskAttemptID> remaining, TezTaskAttemptID mapId) {
+ if (compressedLength < 0 || decompressedLength < 0) {
+ wrongLengthErrs.increment(1);
+ LOG.warn(getName() + " invalid lengths in map output header: id: " +
+ mapId + " len: " + compressedLength + ", decomp len: " +
+ decompressedLength);
+ return false;
+ }
+
+ if (forReduce != reduce) {
+ wrongReduceErrs.increment(1);
+ LOG.warn(getName() + " data for the wrong reduce map: " +
+ mapId + " len: " + compressedLength + " decomp len: " +
+ decompressedLength + " for reduce " + forReduce);
+ return false;
+ }
+
+ // Sanity check
+ if (!remaining.contains(mapId)) {
+ wrongMapErrs.increment(1);
+ LOG.warn("Invalid map-output! Received output for " + mapId);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Create the map-output-url. This will contain all the map ids
+ * separated by commas
+ * @param host
+ * @param maps
+ * @return
+ * @throws MalformedURLException
+ */
+ private URL getMapOutputURL(MapHost host, List<TezTaskAttemptID> maps
+ ) throws MalformedURLException {
+ // Get the base url
+ StringBuffer url = new StringBuffer(host.getBaseUrl());
+
+ boolean first = true;
+ for (TezTaskAttemptID mapId : maps) {
+ if (!first) {
+ url.append(",");
+ }
+ url.append(mapId);
+ first = false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
+ }
+ return new URL(url.toString());
+ }
+
+ /**
+ * The connection establishment is attempted multiple times and is given up
+ * only on the last failure. Instead of connecting with a timeout of
+ * X, we try connecting with a timeout of x < X but multiple times.
+ */
+ private void connect(URLConnection connection, int connectionTimeout)
+ throws IOException {
+ int unit = 0;
+ if (connectionTimeout < 0) {
+ throw new IOException("Invalid timeout "
+ + "[timeout = " + connectionTimeout + " ms]");
+ } else if (connectionTimeout > 0) {
+ unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+ }
+ // set the connect timeout to the unit-connect-timeout
+ connection.setConnectTimeout(unit);
+ while (true) {
+ try {
+ connection.connect();
+ break;
+ } catch (IOException ioe) {
+ // update the total remaining connect-timeout
+ connectionTimeout -= unit;
+
+ // throw an exception if we have waited for timeout amount of time
+ // note that the updated value if timeout is used here
+ if (connectionTimeout == 0) {
+ throw ioe;
+ }
+
+ // reset the connect timeout for the last try
+ if (connectionTimeout < unit) {
+ unit = connectionTimeout;
+ // reset the connect time out for the final connect
+ connection.setConnectTimeout(unit);
+ }
+ }
+ }
+ }
+
+ private void shuffleToMemory(MapHost host, MapOutput mapOutput,
+ InputStream input,
+ int decompressedLength,
+ int compressedLength) throws IOException {
+ IFileInputStream checksumIn =
+ new IFileInputStream(input, compressedLength, job);
+
+ input = checksumIn;
+
+ // Are map-outputs compressed?
+ if (codec != null) {
+ decompressor.reset();
+ input = codec.createInputStream(input, decompressor);
+ }
+
+ // Copy map-output into an in-memory buffer
+ byte[] shuffleData = mapOutput.getMemory();
+
+ try {
+ IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
+ metrics.inputBytes(shuffleData.length);
+ reporter.progress();
+ LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
+ mapOutput.getMapId());
+ } catch (IOException ioe) {
+ // Close the streams
+ IOUtils.cleanup(LOG, input);
+
+ // Re-throw
+ throw ioe;
+ }
+
+ }
+
+ private void shuffleToDisk(MapHost host, MapOutput mapOutput,
+ InputStream input,
+ long compressedLength)
+ throws IOException {
+ // Copy data to local-disk
+ OutputStream output = mapOutput.getDisk();
+ long bytesLeft = compressedLength;
+ try {
+ final int BYTES_TO_READ = 64 * 1024;
+ byte[] buf = new byte[BYTES_TO_READ];
+ while (bytesLeft > 0) {
+ int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ if (n < 0) {
+ throw new IOException("read past end of stream reading " +
+ mapOutput.getMapId());
+ }
+ output.write(buf, 0, n);
+ bytesLeft -= n;
+ metrics.inputBytes(n);
+ reporter.progress();
+ }
+
+ LOG.info("Read " + (compressedLength - bytesLeft) +
+ " bytes from map-output for " +
+ mapOutput.getMapId());
+
+ output.close();
+ } catch (IOException ioe) {
+ // Close the streams
+ IOUtils.cleanup(LOG, input, output);
+
+ // Re-throw
+ throw ioe;
+ }
+
+ // Sanity check
+ if (bytesLeft != 0) {
+ throw new IOException("Incomplete map output received for " +
+ mapOutput.getMapId() + " from " +
+ host.getHostName() + " (" +
+ bytesLeft + " bytes missing of " +
+ compressedLength + ")"
+ );
+ }
+ }
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.IFile.Reader;
+import org.apache.tez.records.TezTaskAttemptID;
+
+/**
+ * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InMemoryReader extends Reader {
+ private final TezTaskAttemptID taskAttemptId;
+ private final MergeManager merger;
+ DataInputBuffer memDataIn = new DataInputBuffer();
+ private int start;
+ private int length;
+ private int prevKeyPos;
+
+ public InMemoryReader(MergeManager merger, TezTaskAttemptID taskAttemptId,
+ byte[] data, int start, int length)
+ throws IOException {
+ super(null, null, length - start, null, null);
+ this.merger = merger;
+ this.taskAttemptId = taskAttemptId;
+
+ buffer = data;
+ bufferSize = (int)fileLength;
+ memDataIn.reset(buffer, start, length);
+ this.start = start;
+ this.length = length;
+ }
+
+ @Override
+ public void reset(int offset) {
+ memDataIn.reset(buffer, start + offset, length);
+ bytesRead = offset;
+ eof = false;
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ // InMemoryReader does not initialize streams like Reader, so in.getPos()
+ // would not work. Instead, return the number of uncompressed bytes read,
+ // which will be correct since in-memory data is not compressed.
+ return bytesRead;
+ }
+
+ @Override
+ public long getLength() {
+ return fileLength;
+ }
+
+ private void dumpOnError() {
+ File dumpFile = new File("../output/" + taskAttemptId + ".dump");
+ System.err.println("Dumping corrupt map-output of " + taskAttemptId +
+ " to " + dumpFile.getAbsolutePath());
+ try {
+ FileOutputStream fos = new FileOutputStream(dumpFile);
+ fos.write(buffer, 0, bufferSize);
+ fos.close();
+ } catch (IOException ioe) {
+ System.err.println("Failed to dump map-output of " + taskAttemptId);
+ }
+ }
+
+ public KeyState readRawKey(DataInputBuffer key) throws IOException {
+ try {
+ if (!positionToNextRecord(memDataIn)) {
+ return KeyState.NO_KEY;
+ }
+ // Setup the key
+ int pos = memDataIn.getPosition();
+ byte[] data = memDataIn.getData();
+ if(currentKeyLength == IFile.RLE_MARKER) {
+ key.reset(data, prevKeyPos, prevKeyLength);
+ currentKeyLength = prevKeyLength;
+ return KeyState.SAME_KEY;
+ }
+ key.reset(data, pos, currentKeyLength);
+ prevKeyPos = pos;
+ // Position for the next value
+ long skipped = memDataIn.skip(currentKeyLength);
+ if (skipped != currentKeyLength) {
+ throw new IOException("Rec# " + recNo +
+ ": Failed to skip past key of length: " +
+ currentKeyLength);
+ }
+
+ // Record the byte
+ bytesRead += currentKeyLength;
+ return KeyState.NEW_KEY;
+ } catch (IOException ioe) {
+ dumpOnError();
+ throw ioe;
+ }
+ }
+
+ public void nextRawValue(DataInputBuffer value) throws IOException {
+ try {
+ int pos = memDataIn.getPosition();
+ byte[] data = memDataIn.getData();
+ value.reset(data, pos, currentValueLength);
+
+ // Position for the next record
+ long skipped = memDataIn.skip(currentValueLength);
+ if (skipped != currentValueLength) {
+ throw new IOException("Rec# " + recNo +
+ ": Failed to skip past value of length: " +
+ currentValueLength);
+ }
+ // Record the byte
+ bytesRead += currentValueLength;
+
+ ++recNo;
+ } catch (IOException ioe) {
+ dumpOnError();
+ throw ioe;
+ }
+ }
+
+ public void close() {
+ // Release
+ dataIn = null;
+ buffer = null;
+ // Inform the MergeManager
+ if (merger != null) {
+ merger.unreserve(bufferSize);
+ }
+ }
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.IFileOutputStream;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InMemoryWriter extends Writer {
+ private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
+
+ private DataOutputStream out;
+
+ public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
+ super(null);
+ this.out =
+ new DataOutputStream(new IFileOutputStream(arrayStream));
+ }
+
+ public void append(Object key, Object value) throws IOException {
+ throw new UnsupportedOperationException
+ ("InMemoryWriter.append(K key, V value");
+ }
+
+ public void append(DataInputBuffer key, DataInputBuffer value)
+ throws IOException {
+ int keyLength = key.getLength() - key.getPosition();
+ if (keyLength < 0) {
+ throw new IOException("Negative key-length not allowed: " + keyLength +
+ " for " + key);
+ }
+
+ boolean sameKey = (key == IFile.REPEAT_KEY);
+
+ int valueLength = value.getLength() - value.getPosition();
+ if (valueLength < 0) {
+ throw new IOException("Negative value-length not allowed: " +
+ valueLength + " for " + value);
+ }
+
+ if(sameKey) {
+ WritableUtils.writeVInt(out, IFile.RLE_MARKER);
+ WritableUtils.writeVInt(out, valueLength);
+ out.write(value.getData(), value.getPosition(), valueLength);
+ } else {
+ LOG.info("XXX InMemWriter.append" +
+ " key.data=" + key.getData() +
+ " key.pos=" + key.getPosition() +
+ " key.len=" +key.getLength() +
+ " val.data=" + value.getData() +
+ " val.pos=" + value.getPosition() +
+ " val.len=" + value.getLength());
+ WritableUtils.writeVInt(out, keyLength);
+ WritableUtils.writeVInt(out, valueLength);
+ out.write(key.getData(), key.getPosition(), keyLength);
+ out.write(value.getData(), value.getPosition(), valueLength);
+ }
+
+ }
+
+ public void close() throws IOException {
+ // Write EOF_MARKER for key/value length
+ WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+ WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+
+ // Close the stream
+ out.close();
+ out = null;
+ }
+
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.records.TezTaskAttemptID;
+
+
+class MapHost {
+
+ public static enum State {
+ IDLE, // No map outputs available
+ BUSY, // Map outputs are being fetched
+ PENDING, // Known map outputs which need to be fetched
+ PENALIZED // Host penalized due to shuffle failures
+ }
+
+ private State state = State.IDLE;
+ private final String hostName;
+ private final String baseUrl;
+ private List<TezTaskAttemptID> maps = new ArrayList<TezTaskAttemptID>();
+
+ public MapHost(String hostName, String baseUrl) {
+ this.hostName = hostName;
+ this.baseUrl = baseUrl;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public String getBaseUrl() {
+ return baseUrl;
+ }
+
+ public synchronized void addKnownMap(TezTaskAttemptID mapId) {
+ maps.add(mapId);
+ if (state == State.IDLE) {
+ state = State.PENDING;
+ }
+ }
+
+ public synchronized List<TezTaskAttemptID> getAndClearKnownMaps() {
+ List<TezTaskAttemptID> currentKnownMaps = maps;
+ maps = new ArrayList<TezTaskAttemptID>();
+ return currentKnownMaps;
+ }
+
+ public synchronized void markBusy() {
+ state = State.BUSY;
+ }
+
+ public synchronized void markPenalized() {
+ state = State.PENALIZED;
+ }
+
+ public synchronized int getNumKnownMapOutputs() {
+ return maps.size();
+ }
+
+ /**
+ * Called when the node is done with its penalty or done copying.
+ * @return the host's new state
+ */
+ public synchronized State markAvailable() {
+ if (maps.isEmpty()) {
+ state = State.IDLE;
+ } else {
+ state = State.PENDING;
+ }
+ return state;
+ }
+
+ @Override
+ public String toString() {
+ return hostName;
+ }
+
+ /**
+ * Mark the host as penalized
+ */
+ public synchronized void penalize() {
+ state = State.PENALIZED;
+ }
+}
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.records.TezTaskAttemptID;
+
+class MapOutput {
+ private static final Log LOG = LogFactory.getLog(MapOutput.class);
+ private static AtomicInteger ID = new AtomicInteger(0);
+
+ public static enum Type {
+ WAIT,
+ MEMORY,
+ DISK
+ }
+
+ private final int id;
+
+ private final MergeManager merger;
+ private final TezTaskAttemptID mapId;
+
+ private final long size;
+
+ private final byte[] memory;
+ private BoundedByteArrayOutputStream byteStream;
+
+ private final FileSystem localFS;
+ private final Path tmpOutputPath;
+ private final Path outputPath;
+ private final OutputStream disk;
+
+ private final Type type;
+
+ private final boolean primaryMapOutput;
+
+ MapOutput(TezTaskAttemptID mapId, MergeManager merger, long size,
+ Configuration conf, LocalDirAllocator localDirAllocator,
+ int fetcher, boolean primaryMapOutput,
+ TezTaskOutputFiles mapOutputFile)
+ throws IOException {
+ this.id = ID.incrementAndGet();
+ this.mapId = mapId;
+ this.merger = merger;
+
+ type = Type.DISK;
+
+ memory = null;
+ byteStream = null;
+
+ this.size = size;
+
+ this.localFS = FileSystem.getLocal(conf);
+ outputPath =
+ mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
+ tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
+
+ disk = localFS.create(tmpOutputPath);
+
+ this.primaryMapOutput = primaryMapOutput;
+ }
+
+ MapOutput(TezTaskAttemptID mapId, MergeManager merger, int size,
+ boolean primaryMapOutput) {
+ this.id = ID.incrementAndGet();
+ this.mapId = mapId;
+ this.merger = merger;
+
+ type = Type.MEMORY;
+ byteStream = new BoundedByteArrayOutputStream(size);
+ memory = byteStream.getBuffer();
+
+ this.size = size;
+
+ localFS = null;
+ disk = null;
+ outputPath = null;
+ tmpOutputPath = null;
+
+ this.primaryMapOutput = primaryMapOutput;
+ }
+
+ public MapOutput(TezTaskAttemptID mapId) {
+ this.id = ID.incrementAndGet();
+ this.mapId = mapId;
+
+ type = Type.WAIT;
+ merger = null;
+ memory = null;
+ byteStream = null;
+
+ size = -1;
+
+ localFS = null;
+ disk = null;
+ outputPath = null;
+ tmpOutputPath = null;
+
+ this.primaryMapOutput = false;
+}
+
+ public boolean isPrimaryMapOutput() {
+ return primaryMapOutput;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof MapOutput) {
+ return id == ((MapOutput)obj).id;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ public Path getOutputPath() {
+ return outputPath;
+ }
+
+ public byte[] getMemory() {
+ return memory;
+ }
+
+ public BoundedByteArrayOutputStream getArrayStream() {
+ return byteStream;
+ }
+
+ public OutputStream getDisk() {
+ return disk;
+ }
+
+ public TezTaskAttemptID getMapId() {
+ return mapId;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void commit() throws IOException {
+ if (type == Type.MEMORY) {
+ merger.closeInMemoryFile(this);
+ } else if (type == Type.DISK) {
+ localFS.rename(tmpOutputPath, outputPath);
+ merger.closeOnDiskFile(outputPath);
+ } else {
+ throw new IOException("Cannot commit MapOutput of type WAIT!");
+ }
+ }
+
+ public void abort() {
+ if (type == Type.MEMORY) {
+ merger.unreserve(memory.length);
+ } else if (type == Type.DISK) {
+ try {
+ localFS.delete(tmpOutputPath, false);
+ } catch (IOException ie) {
+ LOG.info("failure to clean up " + tmpOutputPath, ie);
+ }
+ } else {
+ throw new IllegalArgumentException
+ ("Cannot commit MapOutput with of type WAIT!");
+ }
+ }
+
+ public String toString() {
+ return "MapOutput(" + mapId + ", " + type + ")";
+ }
+
+ public static class MapOutputComparator
+ implements Comparator<MapOutput> {
+ public int compare(MapOutput o1, MapOutput o2) {
+ if (o1.id == o2.id) {
+ return 0;
+ }
+
+ if (o1.size < o2.size) {
+ return -1;
+ } else if (o1.size > o2.size) {
+ return 1;
+ }
+
+ if (o1.id < o2.id) {
+ return -1;
+ } else {
+ return 1;
+
+ }
+ }
+ }
+
+}