You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ka...@apache.org on 2014/05/13 21:14:08 UTC

svn commit: r1594332 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle...

Author: kasha
Date: Tue May 13 19:14:07 2014
New Revision: 1594332

URL: http://svn.apache.org/r1594332
Log:
MAPREDUCE-5652. NM Recovery. ShuffleHandler should handle NM restarts. (Jason Lowe via kasha)

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/
      - copied from r1594329, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/LICENSE.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/pom.xml

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue May 13 19:14:07 2014
@@ -52,6 +52,9 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5774. Job overview in History UI should list reducer phases in
     chronological order. (Gera Shegalov via kasha)
 
+    MAPREDUCE-5652. NM Recovery. ShuffleHandler should handle NM restarts.
+    (Jason Lowe via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/LICENSE.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/LICENSE.txt?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/LICENSE.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/LICENSE.txt Tue May 13 19:14:07 2014
@@ -242,3 +242,100 @@ For the org.apache.hadoop.util.bloom.* c
  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  * POSSIBILITY OF SUCH DAMAGE.
  */
+
+The binary distribution of this product bundles binaries of leveldbjni
+(https://github.com/fusesource/leveldbjni), which is available under the
+following license:
+
+Copyright (c) 2011 FuseSource Corp. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+   * Neither the name of FuseSource Corp. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+The binary distribution of this product bundles binaries of leveldb
+(http://code.google.com/p/leveldb/), which is available under the following
+license:
+
+Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+The binary distribution of this product bundles binaries of snappy
+(http://code.google.com/p/snappy/), which is available under the following
+license:
+
+Copyright 2011, Google Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Tue May 13 19:14:07 2014
@@ -465,6 +465,9 @@
   <Match>
     <Class name="~org\.apache\.hadoop\.mapreduce\.v2\.proto.*" />
   </Match>
+  <Match>
+    <Package name="org.apache.hadoop.mapred.proto" />
+  </Match>
   
    <!--
      The below fields are accessed locally and only via methods that are synchronized. 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java Tue May 13 19:14:07 2014
@@ -48,7 +48,7 @@ import org.apache.hadoop.io.Text;
 @InterfaceStability.Stable
 public class JobID extends org.apache.hadoop.mapred.ID 
                    implements Comparable<ID> {
-  protected static final String JOB = "job";
+  public static final String JOB = "job";
   
   // Jobid regex for various tools and framework components
   public static final String JOBID_REGEX = 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml Tue May 13 19:14:07 2014
@@ -35,12 +35,52 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-nodemanager</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>ShuffleHandlerRecovery.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Tue May 13 19:14:07 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
 import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
@@ -60,6 +62,8 @@ import org.apache.hadoop.io.DataInputByt
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
@@ -72,6 +76,7 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
@@ -81,7 +86,14 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -115,6 +127,7 @@ import org.mortbay.jetty.HttpHeaders;
 
 import com.google.common.base.Charsets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
 
 public class ShuffleHandler extends AuxiliaryService {
 
@@ -132,6 +145,10 @@ public class ShuffleHandler extends Auxi
       "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
       Pattern.CASE_INSENSITIVE);
 
+  private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
+  private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
+  private static final String STATE_DB_SCHEMA_VERSION = "1.0";
+
   private int port;
   private ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
@@ -149,14 +166,14 @@ public class ShuffleHandler extends Auxi
   private boolean shuffleTransferToAllowed;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
+  private Map<String,String> userRsrc;
+  private JobTokenSecretManager secretManager;
+
+  private DB stateDb = null;
+
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce_shuffle";
 
-  private static final Map<String,String> userRsrc =
-    new ConcurrentHashMap<String,String>();
-  private static final JobTokenSecretManager secretManager =
-    new JobTokenSecretManager();
-
   public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
   public static final int DEFAULT_SHUFFLE_PORT = 13562;
 
@@ -292,9 +309,7 @@ public class ShuffleHandler extends Auxi
       Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
        // TODO: Once SHuffle is out of NM, this can use MR APIs
       JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
-      userRsrc.put(jobId.toString(), user);
-      LOG.info("Added token for " + jobId.toString());
-      secretManager.addTokenForJob(jobId.toString(), jt);
+      recordJobShuffleInfo(jobId, user, jt);
     } catch (IOException e) {
       LOG.error("Error during initApp", e);
       // TODO add API to AuxiliaryServices to report failures
@@ -305,8 +320,12 @@ public class ShuffleHandler extends Auxi
   public void stopApplication(ApplicationTerminationContext context) {
     ApplicationId appId = context.getApplicationId();
     JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
-    secretManager.removeTokenForJob(jobId.toString());
-    userRsrc.remove(jobId.toString());
+    try {
+      removeJobShuffleInfo(jobId);
+    } catch (IOException e) {
+      LOG.error("Error during stopApp", e);
+      // TODO add API to AuxiliaryServices to report failures
+    }
   }
 
   @Override
@@ -350,6 +369,9 @@ public class ShuffleHandler extends Auxi
   @Override
   protected void serviceStart() throws Exception {
     Configuration conf = getConfig();
+    userRsrc = new ConcurrentHashMap<String,String>();
+    secretManager = new JobTokenSecretManager();
+    recoverState(conf);
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
     try {
       pipelineFact = new HttpPipelineFactory(conf);
@@ -389,6 +411,9 @@ public class ShuffleHandler extends Auxi
     if (pipelineFact != null) {
       pipelineFact.destroy();
     }
+    if (stateDb != null) {
+      stateDb.close();
+    }
     super.serviceStop();
   }
 
@@ -407,6 +432,140 @@ public class ShuffleHandler extends Auxi
     return new Shuffle(conf);
   }
 
+  private void recoverState(Configuration conf) throws IOException {
+    Path recoveryRoot = getRecoveryPath();
+    if (recoveryRoot != null) {
+      startStore(recoveryRoot);
+      Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX);
+      LeveldbIterator iter = null;
+      try {
+        iter = new LeveldbIterator(stateDb);
+        iter.seek(bytes(JobID.JOB));
+        while (iter.hasNext()) {
+          Map.Entry<byte[],byte[]> entry = iter.next();
+          String key = asString(entry.getKey());
+          if (!jobPattern.matcher(key).matches()) {
+            break;
+          }
+          recoverJobShuffleInfo(key, entry.getValue());
+        }
+      } catch (DBException e) {
+        throw new IOException("Database error during recovery", e);
+      } finally {
+        if (iter != null) {
+          iter.close();
+        }
+      }
+    }
+  }
+
+  private void startStore(Path recoveryRoot) throws IOException {
+    Options options = new Options();
+    options.createIfMissing(false);
+    options.logger(new LevelDBLogger());
+    Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
+    LOG.info("Using state database at " + dbPath + " for recovery");
+    File dbfile = new File(dbPath.toString());
+    byte[] schemaVersionData;
+    try {
+      stateDb = JniDBFactory.factory.open(dbfile, options);
+      schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
+    } catch (NativeDB.DBException e) {
+      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+        LOG.info("Creating state database at " + dbfile);
+        options.createIfMissing(true);
+        try {
+          stateDb = JniDBFactory.factory.open(dbfile, options);
+          schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
+          stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
+        } catch (DBException dbExc) {
+          throw new IOException("Unable to create state store", dbExc);
+        }
+      } else {
+        throw e;
+      }
+    }
+    if (schemaVersionData != null) {
+      String schemaVersion = asString(schemaVersionData);
+      // only support exact schema matches for now
+      if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
+        throw new IOException("Incompatible state database schema, found "
+            + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
+      }
+    } else {
+      throw new IOException("State database schema version not found");
+    }
+  }
+
+  private void addJobToken(JobID jobId, String user,
+      Token<JobTokenIdentifier> jobToken) {
+    userRsrc.put(jobId.toString(), user);
+    secretManager.addTokenForJob(jobId.toString(), jobToken);
+    LOG.info("Added token for " + jobId.toString());
+  }
+
+  private void recoverJobShuffleInfo(String jobIdStr, byte[] data)
+      throws IOException {
+    JobID jobId;
+    try {
+      jobId = JobID.forName(jobIdStr);
+    } catch (IllegalArgumentException e) {
+      throw new IOException("Bad job ID " + jobIdStr + " in state store", e);
+    }
+
+    JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data);
+    String user = proto.getUser();
+    TokenProto tokenProto = proto.getJobToken();
+    Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+        tokenProto.getIdentifier().toByteArray(),
+        tokenProto.getPassword().toByteArray(),
+        new Text(tokenProto.getKind()), new Text(tokenProto.getService()));
+    addJobToken(jobId, user, jobToken);
+  }
+
+  private void recordJobShuffleInfo(JobID jobId, String user,
+      Token<JobTokenIdentifier> jobToken) throws IOException {
+    if (stateDb != null) {
+      TokenProto tokenProto = TokenProto.newBuilder()
+          .setIdentifier(ByteString.copyFrom(jobToken.getIdentifier()))
+          .setPassword(ByteString.copyFrom(jobToken.getPassword()))
+          .setKind(jobToken.getKind().toString())
+          .setService(jobToken.getService().toString())
+          .build();
+      JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder()
+          .setUser(user).setJobToken(tokenProto).build();
+      try {
+        stateDb.put(bytes(jobId.toString()), proto.toByteArray());
+      } catch (DBException e) {
+        throw new IOException("Error storing " + jobId, e);
+      }
+    }
+    addJobToken(jobId, user, jobToken);
+  }
+
+  private void removeJobShuffleInfo(JobID jobId) throws IOException {
+    String jobIdStr = jobId.toString();
+    secretManager.removeTokenForJob(jobIdStr);
+    userRsrc.remove(jobIdStr);
+    if (stateDb != null) {
+      try {
+        stateDb.delete(bytes(jobIdStr));
+      } catch (DBException e) {
+        throw new IOException("Unable to remove " + jobId
+            + " from state store", e);
+      }
+    }
+  }
+
+  private static class LevelDBLogger implements Logger {
+    private static final Log LOG = LogFactory.getLog(LevelDBLogger.class);
+
+    @Override
+    public void log(String message) {
+      LOG.info(message);
+    }
+  }
+
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Tue May 13 19:14:07 2014
@@ -51,11 +51,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -68,6 +72,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
@@ -645,4 +650,93 @@ public class TestShuffleHandler {
     output.writeLong(chk.getChecksum().getValue());
     output.close();
   }
+
+  @Test
+  public void testRecovery() throws IOException {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
+    final File tmpDir = new File(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir")),
+        TestShuffleHandler.class.getName());
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    ShuffleHandler shuffle = new ShuffleHandler();
+    // emulate aux services startup with recovery enabled
+    shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+    tmpDir.mkdirs();
+    try {
+      shuffle.init(conf);
+      shuffle.start();
+
+      // setup a shuffle token for an application
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+          "identifier".getBytes(), "password".getBytes(), new Text(user),
+          new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffle.initializeApplication(new ApplicationInitializationContext(user,
+          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+            outputBuffer.getLength())));
+
+      // verify we are authorized to shuffle
+      int rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+      // emulate shuffle handler restart
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+
+      // verify we are still authorized to shuffle to the old application
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+      // shutdown app and verify access is lost
+      shuffle.stopApplication(new ApplicationTerminationContext(appId));
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+
+      // emulate shuffle handler restart
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+
+      // verify we still don't have access
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+    } finally {
+      if (shuffle != null) {
+        shuffle.close();
+      }
+      FileUtil.fullyDelete(tmpDir);
+    }
+  }
+
+  private static int getShuffleResponseCode(ShuffleHandler shuffle,
+      Token<JobTokenIdentifier> jt) throws IOException {
+    URL url = new URL("http://127.0.0.1:"
+        + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+        + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    String encHash = SecureShuffleUtils.hashFromString(
+        SecureShuffleUtils.buildMsgFrom(url),
+        JobTokenSecretManager.createSecretKey(jt.getPassword()));
+    conn.addRequestProperty(
+        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    conn.connect();
+    int rc = conn.getResponseCode();
+    conn.disconnect();
+    return rc;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/pom.xml?rev=1594332&r1=1594331&r2=1594332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/pom.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/pom.xml Tue May 13 19:14:07 2014
@@ -162,6 +162,10 @@
       <artifactId>hsqldb</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+    </dependency>
 
   </dependencies>