You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2014/07/23 19:30:09 UTC
svn commit: r1612883 - in
/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/ja...
Author: szetszwo
Date: Wed Jul 23 17:30:06 2014
New Revision: 1612883
URL: http://svn.apache.org/r1612883
Log:
Merge r1609845 through r1612880 from trunk.
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1612432-1612880
Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt?rev=1612883&r1=1612882&r2=1612883&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt Wed Jul 23 17:30:06 2014
@@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5971. Move the default options for distcp -p to
DistCpOptionSwitch. (clamb via wang)
+ MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with
+ compatible/incompatible changes (Junping Du via jlowe)
+
OPTIMIZATIONS
BUG FIXES
Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1612432-1612880
Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1612883&r1=1612882&r2=1612883&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Jul 23 17:30:06 2014
@@ -82,10 +82,13 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
@@ -125,6 +128,7 @@ import org.jboss.netty.handler.stream.Ch
import org.jboss.netty.util.CharsetUtil;
import org.mortbay.jetty.HttpHeaders;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@@ -146,8 +150,9 @@ public class ShuffleHandler extends Auxi
Pattern.CASE_INSENSITIVE);
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
- private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
- private static final String STATE_DB_SCHEMA_VERSION = "1.0";
+ private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
+ protected static final NMDBSchemaVersion CURRENT_VERSION_INFO =
+ NMDBSchemaVersion.newInstance(1, 0);
private int port;
private ChannelFactory selector;
@@ -466,18 +471,15 @@ public class ShuffleHandler extends Auxi
Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
LOG.info("Using state database at " + dbPath + " for recovery");
File dbfile = new File(dbPath.toString());
- byte[] schemaVersionData;
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
- schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
- schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
- stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
+ storeVersion();
} catch (DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}
@@ -485,15 +487,69 @@ public class ShuffleHandler extends Auxi
throw e;
}
}
- if (schemaVersionData != null) {
- String schemaVersion = asString(schemaVersionData);
- // only support exact schema matches for now
- if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
- throw new IOException("Incompatible state database schema, found "
- + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
- }
+ checkVersion();
+ }
+
+ @VisibleForTesting
+ NMDBSchemaVersion loadVersion() throws IOException {
+ byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
+ // if version is not stored previously, treat it as 1.0.
+ if (data == null || data.length == 0) {
+ return NMDBSchemaVersion.newInstance(1, 0);
+ }
+ NMDBSchemaVersion version =
+ new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+ return version;
+ }
+
+ private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException {
+ String key = STATE_DB_SCHEMA_VERSION_KEY;
+ byte[] data =
+ ((NMDBSchemaVersionPBImpl) version).getProto().toByteArray();
+ try {
+ stateDb.put(bytes(key), data);
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ private void storeVersion() throws IOException {
+ storeSchemaVersion(CURRENT_VERSION_INFO);
+ }
+
+ // Only used for test
+ @VisibleForTesting
+ void storeVersion(NMDBSchemaVersion version) throws IOException {
+ storeSchemaVersion(version);
+ }
+
+ protected NMDBSchemaVersion getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ /**
+ * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+ * 2) Any incompatible change of DB schema is a major upgrade, and any
+ * compatible change of DB schema is a minor upgrade.
+ * 3) Within a minor upgrade, say 1.1 to 1.2:
+ * overwrite the version info and proceed as normal.
+ * 4) Within a major upgrade, say 1.2 to 2.0:
+ * throw exception and indicate user to use a separate upgrade tool to
+ * upgrade shuffle info or remove incompatible old state.
+ */
+ private void checkVersion() throws IOException {
+ NMDBSchemaVersion loadedVersion = loadVersion();
+ LOG.info("Loaded state DB schema version info " + loadedVersion);
+ if (loadedVersion.equals(getCurrentVersion())) {
+ return;
+ }
+ if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+ LOG.info("Storing state DB schedma version info " + getCurrentVersion());
+ storeVersion();
} else {
- throw new IOException("State database schema version not found");
+ throw new IOException(
+ "Incompatible version for state DB schema: expecting DB schema version "
+ + getCurrentVersion() + ", but loading version " + loadedVersion);
}
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1612883&r1=1612882&r2=1612883&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Wed Jul 23 17:30:06 2014
@@ -67,6 +67,7 @@ import org.apache.hadoop.metrics2.Metric
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -718,6 +720,94 @@ public class TestShuffleHandler {
FileUtil.fullyDelete(tmpDir);
}
}
+
+ @Test
+ public void testRecoveryFromOtherVersions() throws IOException {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ final File tmpDir = new File(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestShuffleHandler.class.getName());
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ ShuffleHandler shuffle = new ShuffleHandler();
+ // emulate aux services startup with recovery enabled
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ tmpDir.mkdirs();
+ try {
+ shuffle.init(conf);
+ shuffle.start();
+
+ // setup a shuffle token for an application
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+ "identifier".getBytes(), "password".getBytes(), new Text(user),
+ new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffle.initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+
+ // verify we are authorized to shuffle
+ int rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+ NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0);
+ Assert.assertEquals(version, shuffle.getCurrentVersion());
+
+ // emulate shuffle handler restart with compatible version
+ NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1);
+ // update version info before close shuffle
+ shuffle.storeVersion(version11);
+ Assert.assertEquals(version11, shuffle.loadVersion());
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+ // shuffle version will be override by CURRENT_VERSION_INFO after restart
+ // successfully.
+ Assert.assertEquals(version, shuffle.loadVersion());
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart with incompatible version
+ NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1);
+ shuffle.storeVersion(version21);
+ Assert.assertEquals(version21, shuffle.loadVersion());
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+
+ try {
+ shuffle.start();
+ Assert.fail("Incompatible version, should expect fail here.");
+ } catch (ServiceStateException e) {
+ Assert.assertTrue("Exception message mismatch",
+ e.getMessage().contains("Incompatible version for state DB schema:"));
+ }
+
+ } finally {
+ if (shuffle != null) {
+ shuffle.close();
+ }
+ FileUtil.fullyDelete(tmpDir);
+ }
+ }
private static int getShuffleResponseCode(ShuffleHandler shuffle,
Token<JobTokenIdentifier> jt) throws IOException {