You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by zj...@apache.org on 2014/08/07 19:02:27 UTC

svn commit: r1616540 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-a...

Author: zjshen
Date: Thu Aug  7 17:02:26 2014
New Revision: 1616540

URL: http://svn.apache.org/r1616540
Log:
YARN-2288. Made persisted data in LevelDB timeline store be versioned. Contributed by Junping Du.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1616540&r1=1616539&r2=1616540&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Aug  7 17:02:26 2014
@@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2298. Move TimelineClient to yarn-common project (Zhijie Shen via 
     junping_du)
 
+    YARN-2288. Made persisted data in LevelDB timeline store be versioned. (Junping Du
+    via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java?rev=1616540&r1=1616539&r2=1616540&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java Thu Aug  7 17:02:26 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.ti
 
 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -60,8 +61,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
 import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.ReadOptions;
@@ -141,6 +146,11 @@ public class LeveldbTimelineStore extend
       "z".getBytes();
 
   private static final byte[] EMPTY_BYTES = new byte[0];
+  
+  private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
+  
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 0);
 
   @Private
   @VisibleForTesting
@@ -193,6 +203,7 @@ public class LeveldbTimelineStore extend
     }
     LOG.info("Using leveldb path " + dbPath);
     db = factory.open(new File(dbPath.toString()), options);
+    checkVersion();
     startTimeWriteCache =
         Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
             conf)));
@@ -1270,8 +1281,6 @@ public class LeveldbTimelineStore extend
             DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
   }
 
-  // warning is suppressed to prevent eclipse from noting unclosed resource
-  @SuppressWarnings("resource")
   @VisibleForTesting
   List<String> getEntityTypes() throws IOException {
     DBIterator iterator = null;
@@ -1489,4 +1498,65 @@ public class LeveldbTimelineStore extend
     readOptions.fillCache(fillCache);
     return db.iterator(readOptions);
   }
+  
+  Version loadVersion() throws IOException {
+    byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return Version.newInstance(1, 0);
+    }
+    Version version =
+        new VersionPBImpl(VersionProto.parseFrom(data));
+    return version;
+  }
+  
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(Version state) throws IOException {
+    dbStoreVersion(state);
+  }
+  
+  private void dbStoreVersion(Version state) throws IOException {
+    String key = TIMELINE_STORE_VERSION_KEY;
+    byte[] data = 
+        ((VersionPBImpl) state).getProto().toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+  
+  /**
+   * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of TS-store is a major upgrade, and any
+   *    compatible change of TS-store is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade timeline store or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded timeline store version info " + loadedVersion);
+    if (loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing timeline store version info " + getCurrentVersion());
+      dbStoreVersion(CURRENT_VERSION_INFO);
+    } else {
+      String incompatibleMessage = 
+          "Incompatible version for timeline store: expecting version " 
+              + getCurrentVersion() + ", but loading version " + loadedVersion;
+      LOG.fatal(incompatibleMessage);
+      throw new IOException(incompatibleMessage);
+    }
+  }
+  
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java?rev=1616540&r1=1616539&r2=1616540&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java Thu Aug  7 17:02:26 2014
@@ -36,14 +36,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.iq80.leveldb.DBIterator;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,19 +55,19 @@ import org.junit.Test;
 public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
   private FileContext fsContext;
   private File fsPath;
+  private Configuration config = new YarnConfiguration();
 
   @Before
   public void setup() throws Exception {
     fsContext = FileContext.getLocalFSFileContext();
-    Configuration conf = new YarnConfiguration();
     fsPath = new File("target", this.getClass().getSimpleName() +
         "-tmpDir").getAbsoluteFile();
     fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
-    conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+    config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
         fsPath.getAbsolutePath());
-    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
     store = new LeveldbTimelineStore();
-    store.init(conf);
+    store.init(config);
     store.start();
     loadTestData();
     loadVerificationData();
@@ -263,5 +266,47 @@ public class TestLeveldbTimelineStore ex
     assertEquals(1, getEntities("type_2").size());
     assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
   }
+  
+  @Test
+  public void testCheckVersion() throws IOException {
+    LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
+    // default version
+    Version defaultVersion = dbStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // compatible version
+    Version compatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    dbStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
+    restartTimelineStore();
+    dbStore = (LeveldbTimelineStore) store;
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // incompatible version
+    Version incompatibleVersion =
+      Version.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    dbStore.storeVersion(incompatibleVersion);
+    try {
+      restartTimelineStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for timeline store"));
+    }
+  }
+  
+  private void restartTimelineStore() throws IOException {
+    // need to close so leveldb releases database lock
+    if (store != null) {
+      store.close();
+    }
+    store = new LeveldbTimelineStore();
+    store.init(config);
+    store.start();
+  }
 
 }