You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ka...@apache.org on 2014/05/13 21:10:48 UTC
svn commit: r1594329 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/ hadoop-map...
Author: kasha
Date: Tue May 13 19:10:48 2014
New Revision: 1594329
URL: http://svn.apache.org/r1594329
Log:
MAPREDUCE-5652. NM Recovery. ShuffleHandler should handle NM restarts. (Jason Lowe via kasha)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/ShuffleHandlerRecovery.proto
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/LICENSE.txt
hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/pom.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue May 13 19:10:48 2014
@@ -191,6 +191,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5774. Job overview in History UI should list reducer phases in
chronological order. (Gera Shegalov via kasha)
+ MAPREDUCE-5652. NM Recovery. ShuffleHandler should handle NM restarts.
+ (Jason Lowe via kasha)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/LICENSE.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/LICENSE.txt?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/LICENSE.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/LICENSE.txt Tue May 13 19:10:48 2014
@@ -242,3 +242,100 @@ For the org.apache.hadoop.util.bloom.* c
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
+
+The binary distribution of this product bundles binaries of leveldbjni
+(https://github.com/fusesource/leveldbjni), which is available under the
+following license:
+
+Copyright (c) 2011 FuseSource Corp. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of FuseSource Corp. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+The binary distribution of this product bundles binaries of leveldb
+(http://code.google.com/p/leveldb/), which is available under the following
+license:
+
+Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+The binary distribution of this product bundles binaries of snappy
+(http://code.google.com/p/snappy/), which is available under the following
+license:
+
+Copyright 2011, Google Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Tue May 13 19:10:48 2014
@@ -465,6 +465,9 @@
<Match>
<Class name="~org\.apache\.hadoop\.mapreduce\.v2\.proto.*" />
</Match>
+ <Match>
+ <Package name="org.apache.hadoop.mapred.proto" />
+ </Match>
<!--
The below fields are accessed locally and only via methods that are synchronized.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java Tue May 13 19:10:48 2014
@@ -48,7 +48,7 @@ import org.apache.hadoop.io.Text;
@InterfaceStability.Stable
public class JobID extends org.apache.hadoop.mapred.ID
implements Comparable<ID> {
- protected static final String JOB = "job";
+ public static final String JOB = "job";
// Jobid regex for various tools and framework components
public static final String JOBID_REGEX =
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml Tue May 13 19:10:48 2014
@@ -35,12 +35,52 @@
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>ShuffleHandlerRecovery.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Tue May 13 19:10:48 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
@@ -60,6 +62,8 @@ import org.apache.hadoop.io.DataInputByt
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
@@ -72,6 +76,7 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
@@ -81,7 +86,14 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -115,6 +127,7 @@ import org.mortbay.jetty.HttpHeaders;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
public class ShuffleHandler extends AuxiliaryService {
@@ -132,6 +145,10 @@ public class ShuffleHandler extends Auxi
"^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
Pattern.CASE_INSENSITIVE);
+ private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
+ private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
+ private static final String STATE_DB_SCHEMA_VERSION = "1.0";
+
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
@@ -149,14 +166,14 @@ public class ShuffleHandler extends Auxi
private boolean shuffleTransferToAllowed;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+ private Map<String,String> userRsrc;
+ private JobTokenSecretManager secretManager;
+
+ private DB stateDb = null;
+
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
"mapreduce_shuffle";
- private static final Map<String,String> userRsrc =
- new ConcurrentHashMap<String,String>();
- private static final JobTokenSecretManager secretManager =
- new JobTokenSecretManager();
-
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 13562;
@@ -292,9 +309,7 @@ public class ShuffleHandler extends Auxi
Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
// TODO: Once SHuffle is out of NM, this can use MR APIs
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
- userRsrc.put(jobId.toString(), user);
- LOG.info("Added token for " + jobId.toString());
- secretManager.addTokenForJob(jobId.toString(), jt);
+ recordJobShuffleInfo(jobId, user, jt);
} catch (IOException e) {
LOG.error("Error during initApp", e);
// TODO add API to AuxiliaryServices to report failures
@@ -305,8 +320,12 @@ public class ShuffleHandler extends Auxi
public void stopApplication(ApplicationTerminationContext context) {
ApplicationId appId = context.getApplicationId();
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
- secretManager.removeTokenForJob(jobId.toString());
- userRsrc.remove(jobId.toString());
+ try {
+ removeJobShuffleInfo(jobId);
+ } catch (IOException e) {
+ LOG.error("Error during stopApp", e);
+ // TODO add API to AuxiliaryServices to report failures
+ }
}
@Override
@@ -350,6 +369,9 @@ public class ShuffleHandler extends Auxi
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
+ userRsrc = new ConcurrentHashMap<String,String>();
+ secretManager = new JobTokenSecretManager();
+ recoverState(conf);
ServerBootstrap bootstrap = new ServerBootstrap(selector);
try {
pipelineFact = new HttpPipelineFactory(conf);
@@ -389,6 +411,9 @@ public class ShuffleHandler extends Auxi
if (pipelineFact != null) {
pipelineFact.destroy();
}
+ if (stateDb != null) {
+ stateDb.close();
+ }
super.serviceStop();
}
@@ -407,6 +432,140 @@ public class ShuffleHandler extends Auxi
return new Shuffle(conf);
}
+ private void recoverState(Configuration conf) throws IOException {
+ Path recoveryRoot = getRecoveryPath();
+ if (recoveryRoot != null) {
+ startStore(recoveryRoot);
+ Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX);
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(stateDb);
+ iter.seek(bytes(JobID.JOB));
+ while (iter.hasNext()) {
+ Map.Entry<byte[],byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!jobPattern.matcher(key).matches()) {
+ break;
+ }
+ recoverJobShuffleInfo(key, entry.getValue());
+ }
+ } catch (DBException e) {
+ throw new IOException("Database error during recovery", e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ }
+ }
+
+ private void startStore(Path recoveryRoot) throws IOException {
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LevelDBLogger());
+ Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
+ LOG.info("Using state database at " + dbPath + " for recovery");
+ File dbfile = new File(dbPath.toString());
+ byte[] schemaVersionData;
+ try {
+ stateDb = JniDBFactory.factory.open(dbfile, options);
+ schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ LOG.info("Creating state database at " + dbfile);
+ options.createIfMissing(true);
+ try {
+ stateDb = JniDBFactory.factory.open(dbfile, options);
+ schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
+ stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
+ } catch (DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ throw e;
+ }
+ }
+ if (schemaVersionData != null) {
+ String schemaVersion = asString(schemaVersionData);
+ // only support exact schema matches for now
+ if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
+ throw new IOException("Incompatible state database schema, found "
+ + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
+ }
+ } else {
+ throw new IOException("State database schema version not found");
+ }
+ }
+
+ private void addJobToken(JobID jobId, String user,
+ Token<JobTokenIdentifier> jobToken) {
+ userRsrc.put(jobId.toString(), user);
+ secretManager.addTokenForJob(jobId.toString(), jobToken);
+ LOG.info("Added token for " + jobId.toString());
+ }
+
+ private void recoverJobShuffleInfo(String jobIdStr, byte[] data)
+ throws IOException {
+ JobID jobId;
+ try {
+ jobId = JobID.forName(jobIdStr);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Bad job ID " + jobIdStr + " in state store", e);
+ }
+
+ JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data);
+ String user = proto.getUser();
+ TokenProto tokenProto = proto.getJobToken();
+ Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+ tokenProto.getIdentifier().toByteArray(),
+ tokenProto.getPassword().toByteArray(),
+ new Text(tokenProto.getKind()), new Text(tokenProto.getService()));
+ addJobToken(jobId, user, jobToken);
+ }
+
+ private void recordJobShuffleInfo(JobID jobId, String user,
+ Token<JobTokenIdentifier> jobToken) throws IOException {
+ if (stateDb != null) {
+ TokenProto tokenProto = TokenProto.newBuilder()
+ .setIdentifier(ByteString.copyFrom(jobToken.getIdentifier()))
+ .setPassword(ByteString.copyFrom(jobToken.getPassword()))
+ .setKind(jobToken.getKind().toString())
+ .setService(jobToken.getService().toString())
+ .build();
+ JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder()
+ .setUser(user).setJobToken(tokenProto).build();
+ try {
+ stateDb.put(bytes(jobId.toString()), proto.toByteArray());
+ } catch (DBException e) {
+ throw new IOException("Error storing " + jobId, e);
+ }
+ }
+ addJobToken(jobId, user, jobToken);
+ }
+
+ private void removeJobShuffleInfo(JobID jobId) throws IOException {
+ String jobIdStr = jobId.toString();
+ secretManager.removeTokenForJob(jobIdStr);
+ userRsrc.remove(jobIdStr);
+ if (stateDb != null) {
+ try {
+ stateDb.delete(bytes(jobIdStr));
+ } catch (DBException e) {
+ throw new IOException("Unable to remove " + jobId
+ + " from state store", e);
+ }
+ }
+ }
+
+ private static class LevelDBLogger implements Logger {
+ private static final Log LOG = LogFactory.getLog(LevelDBLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+
class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE;
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/ShuffleHandlerRecovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/ShuffleHandlerRecovery.proto?rev=1594329&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/ShuffleHandlerRecovery.proto (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/proto/ShuffleHandlerRecovery.proto Tue May 13 19:10:48 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.mapred.proto";
+option java_outer_classname = "ShuffleHandlerRecoveryProtos";
+option java_generic_services = true;
+package hadoop.mapreduce;
+
+import "Security.proto";
+
+message JobShuffleInfoProto {
+ optional string user = 1;
+ optional hadoop.common.TokenProto jobToken = 2;
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Tue May 13 19:10:48 2014
@@ -51,11 +51,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
@@ -68,6 +72,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@@ -645,4 +650,93 @@ public class TestShuffleHandler {
output.writeLong(chk.getChecksum().getValue());
output.close();
}
+
+ @Test
+ public void testRecovery() throws IOException {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
+ final File tmpDir = new File(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestShuffleHandler.class.getName());
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ ShuffleHandler shuffle = new ShuffleHandler();
+ // emulate aux services startup with recovery enabled
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ tmpDir.mkdirs();
+ try {
+ shuffle.init(conf);
+ shuffle.start();
+
+ // setup a shuffle token for an application
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+ "identifier".getBytes(), "password".getBytes(), new Text(user),
+ new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffle.initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+
+ // verify we are authorized to shuffle
+ int rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // shutdown app and verify access is lost
+ shuffle.stopApplication(new ApplicationTerminationContext(appId));
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+
+ // emulate shuffle handler restart
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+
+ // verify we still don't have access
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+ } finally {
+ if (shuffle != null) {
+ shuffle.close();
+ }
+ FileUtil.fullyDelete(tmpDir);
+ }
+ }
+
+ private static int getShuffleResponseCode(ShuffleHandler shuffle,
+ Token<JobTokenIdentifier> jt) throws IOException {
+ URL url = new URL("http://127.0.0.1:"
+ + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ String encHash = SecureShuffleUtils.hashFromString(
+ SecureShuffleUtils.buildMsgFrom(url),
+ JobTokenSecretManager.createSecretKey(jt.getPassword()));
+ conn.addRequestProperty(
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ int rc = conn.getResponseCode();
+ conn.disconnect();
+ return rc;
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/pom.xml?rev=1594329&r1=1594328&r2=1594329&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/pom.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/pom.xml Tue May 13 19:10:48 2014
@@ -144,6 +144,10 @@
<artifactId>hsqldb</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ </dependency>
</dependencies>