You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/09/21 01:07:51 UTC

[48/50] [abbrv] hadoop git commit: YARN-5947: Create LeveldbConfigurationStore class using Leveldb as backing store. Contributed by Jonathan Hung

YARN-5947: Create LeveldbConfigurationStore class using Leveldb as backing store. Contributed by Jonathan Hung


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b06711cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b06711cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b06711cb

Branch: refs/heads/YARN-5734
Commit: b06711cb1f6cdbe5f60fbd1f0cbf7890e1ef779d
Parents: e462f10
Author: Xuan <xg...@apache.org>
Authored: Mon Jul 31 16:48:40 2017 -0700
Committer: Jonathan Hung <jh...@linkedin.com>
Committed: Wed Sep 20 17:40:54 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  13 +
 .../src/main/resources/yarn-default.xml         |  29 ++
 .../scheduler/MutableConfigurationProvider.java |   6 +
 .../scheduler/capacity/CapacityScheduler.java   |   3 +
 .../conf/LeveldbConfigurationStore.java         | 314 +++++++++++++++++++
 .../conf/MutableCSConfigurationProvider.java    |  38 ++-
 .../capacity/conf/YarnConfigurationStore.java   |  14 +-
 .../conf/TestYarnConfigurationStore.java        |   3 +-
 8 files changed, 414 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e1062d7..a33d85d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -677,8 +677,21 @@ public class YarnConfiguration extends Configuration {
   public static final String SCHEDULER_CONFIGURATION_STORE_CLASS =
       YARN_PREFIX + "scheduler.configuration.store.class";
   public static final String MEMORY_CONFIGURATION_STORE = "memory";
+  public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
   public static final String DEFAULT_CONFIGURATION_STORE =
       MEMORY_CONFIGURATION_STORE;
+  public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
+      + "scheduler.configuration.leveldb-store.path";
+
+  public static final String RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS =
+      YARN_PREFIX
+          + "scheduler.configuration.leveldb-store.compaction-interval-secs";
+  public static final long
+      DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L;
+
+  public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS =
+      YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs";
+  public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
 
   public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
       YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 86aa15e..4529f20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3371,4 +3371,33 @@
     <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultConfigurationMutationACLPolicy</value>
   </property>
 
+  <property>
+    <description>
+      The storage path for LevelDB implementation of configuration store,
+      when yarn.scheduler.configuration.store.class is configured to be
+      "leveldb".
+    </description>
+    <name>yarn.scheduler.configuration.leveldb-store.path</name>
+    <value>${hadoop.tmp.dir}/yarn/system/confstore</value>
+  </property>
+
+  <property>
+    <description>
+      The compaction interval for LevelDB configuration store in secs,
+      when yarn.scheduler.configuration.store.class is configured to be
+      "leveldb". Default is one day.
+    </description>
+    <name>yarn.scheduler.configuration.leveldb-store.compaction-interval-secs</name>
+    <value>86400</value>
+  </property>
+
+  <property>
+    <description>
+      The max number of configuration change log entries kept in LevelDB config
+      store, when yarn.scheduler.configuration.store.class is configured to be
+      "leveldb". Default is 1000.
+    </description>
+    <name>yarn.scheduler.configuration.leveldb-store.max-logs</name>
+    <value>1000</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 86be7c3..1f13467 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -29,6 +29,12 @@ import java.io.IOException;
 public interface MutableConfigurationProvider {
 
   /**
+   * Apply transactions which were not committed.
+   * @throws IOException if recovery fails
+   */
+  void recoverConf() throws IOException;
+
+  /**
    * Update the scheduler configuration with the provided key value pairs.
    * @param user User issuing the request
    * @param confUpdate Key-value pairs for configurations to be updated.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index da395b7..6d2de7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -393,6 +393,9 @@ public class CapacityScheduler extends
   @Override
   public void serviceStart() throws Exception {
     startSchedulerThreads();
+    if (this.csConfProvider instanceof MutableConfigurationProvider) {
+      ((MutableConfigurationProvider) csConfProvider).recoverConf();
+    }
     super.serviceStart();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
new file mode 100644
index 0000000..1534685
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBComparator;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+/**
+ * A LevelDB implementation of {@link YarnConfigurationStore}.
+ */
+public class LeveldbConfigurationStore implements YarnConfigurationStore {
+
+  public static final Log LOG =
+      LogFactory.getLog(LeveldbConfigurationStore.class);
+
+  private static final String DB_NAME = "yarn-conf-store";
+  private static final String LOG_PREFIX = "log.";
+  private static final String LOG_COMMITTED_TXN = "committedTxn";
+
+  private DB db;
+  private long txnId = 0;
+  private long minTxn = 0;
+  private long maxLogs;
+  private Configuration conf;
+  private LinkedList<LogMutation> pendingMutations = new LinkedList<>();
+  private Timer compactionTimer;
+  private long compactionIntervalMsec;
+
+  @Override
+  public void initialize(Configuration config, Configuration schedConf)
+      throws IOException {
+    this.conf = config;
+    try {
+      this.db = initDatabase(schedConf);
+      this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
+          StandardCharsets.UTF_8));
+      DBIterator itr = db.iterator();
+      itr.seek(bytes(LOG_PREFIX + txnId));
+      // Seek to first uncommitted log
+      itr.next();
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        if (!new String(entry.getKey(), StandardCharsets.UTF_8)
+            .startsWith(LOG_PREFIX)) {
+          break;
+        }
+        pendingMutations.add(deserLogMutation(entry.getValue()));
+      }
+      // Get the earliest txnId stored in logs
+      itr.seekToFirst();
+      if (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        byte[] key = entry.getKey();
+        String logId = new String(key, StandardCharsets.UTF_8);
+        if (logId.startsWith(LOG_PREFIX)) {
+          minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
+        }
+      }
+      this.maxLogs = config.getLong(
+          YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS,
+          YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
+      this.compactionIntervalMsec = config.getLong(
+          YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
+          YarnConfiguration
+              .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
+      startCompactionTimer();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private DB initDatabase(Configuration config) throws Exception {
+    Path storeRoot = createStorageDir();
+    Options options = new Options();
+    options.createIfMissing(false);
+    options.comparator(new DBComparator() {
+      @Override
+      public int compare(byte[] key1, byte[] key2) {
+        String key1Str = new String(key1, StandardCharsets.UTF_8);
+        String key2Str = new String(key2, StandardCharsets.UTF_8);
+        int key1Txn = Integer.MAX_VALUE;
+        int key2Txn = Integer.MAX_VALUE;
+        if (key1Str.startsWith(LOG_PREFIX)) {
+          key1Txn = Integer.parseInt(key1Str.substring(
+              key1Str.indexOf('.') + 1));
+        }
+        if (key2Str.startsWith(LOG_PREFIX)) {
+          key2Txn = Integer.parseInt(key2Str.substring(
+              key2Str.indexOf('.') + 1));
+        }
+        // TODO txnId could overflow, in theory
+        if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
+          if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
+            return 0;
+          } else if (key1Str.equals(LOG_COMMITTED_TXN)) {
+            return -1;
+          } else if (key2Str.equals(LOG_COMMITTED_TXN)) {
+            return 1;
+          }
+          return key1Str.compareTo(key2Str);
+        }
+        return key1Txn - key2Txn;
+      }
+
+      @Override
+      public String name() {
+        return "logComparator";
+      }
+
+      public byte[] findShortestSeparator(byte[] start, byte[] limit) {
+        return start;
+      }
+
+      public byte[] findShortSuccessor(byte[] key) {
+        return key;
+      }
+    });
+    LOG.info("Using conf database at " + storeRoot);
+    File dbfile = new File(storeRoot.toString());
+    try {
+      db = JniDBFactory.factory.open(dbfile, options);
+    } catch (NativeDB.DBException e) {
+      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+        LOG.info("Creating conf database at " + dbfile);
+        options.createIfMissing(true);
+        try {
+          db = JniDBFactory.factory.open(dbfile, options);
+          // Write the initial scheduler configuration
+          WriteBatch initBatch = db.createWriteBatch();
+          for (Map.Entry<String, String> kv : config) {
+            initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
+          }
+          initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
+          db.write(initBatch);
+        } catch (DBException dbErr) {
+          throw new IOException(dbErr.getMessage(), dbErr);
+        }
+      } else {
+        throw e;
+      }
+    }
+    return db;
+  }
+
+  private Path createStorageDir() throws IOException {
+    Path root = getStorageDir();
+    FileSystem fs = FileSystem.getLocal(conf);
+    fs.mkdirs(root, new FsPermission((short) 0700));
+    return root;
+  }
+
+  private Path getStorageDir() throws IOException {
+    String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
+    if (storePath == null) {
+      throw new IOException("No store location directory configured in " +
+          YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
+    }
+    return new Path(storePath, DB_NAME);
+  }
+
+  @Override
+  public synchronized long logMutation(LogMutation logMutation)
+      throws IOException {
+    logMutation.setId(++txnId);
+    WriteBatch logBatch = db.createWriteBatch();
+    logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation));
+    if (txnId - minTxn >= maxLogs) {
+      logBatch.delete(bytes(LOG_PREFIX + minTxn));
+      minTxn++;
+    }
+    db.write(logBatch);
+    pendingMutations.add(logMutation);
+    return txnId;
+  }
+
+  @Override
+  public synchronized boolean confirmMutation(long id, boolean isValid)
+      throws IOException {
+    WriteBatch updateBatch = db.createWriteBatch();
+    if (isValid) {
+      LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
+      for (Map.Entry<String, String> changes :
+          mutation.getUpdates().entrySet()) {
+        if (changes.getValue() == null || changes.getValue().isEmpty()) {
+          updateBatch.delete(bytes(changes.getKey()));
+        } else {
+          updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
+        }
+      }
+    }
+    updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
+    db.write(updateBatch);
+    // Assumes logMutation and confirmMutation are done in the same
+    // synchronized method. For example,
+    // {@link MutableCSConfigurationProvider#mutateConfiguration(
+    // UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
+    pendingMutations.removeFirst();
+    return true;
+  }
+
+  private byte[] serLogMutation(LogMutation mutation) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (ObjectOutput oos = new ObjectOutputStream(baos)) {
+      oos.writeObject(mutation);
+      oos.flush();
+      return baos.toByteArray();
+    }
+  }
+  private LogMutation deserLogMutation(byte[] mutation) throws IOException {
+    try (ObjectInput input = new ObjectInputStream(
+        new ByteArrayInputStream(mutation))) {
+      return (LogMutation) input.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public synchronized Configuration retrieve() {
+    DBIterator itr = db.iterator();
+    itr.seek(bytes(LOG_COMMITTED_TXN));
+    Configuration config = new Configuration(false);
+    itr.next();
+    while (itr.hasNext()) {
+      Map.Entry<byte[], byte[]> entry = itr.next();
+      config.set(new String(entry.getKey(), StandardCharsets.UTF_8),
+          new String(entry.getValue(), StandardCharsets.UTF_8));
+    }
+    return config;
+  }
+
+  @Override
+  public List<LogMutation> getPendingMutations() {
+    return pendingMutations;
+  }
+
+  @Override
+  public List<LogMutation> getConfirmedConfHistory(long fromId) {
+    return null; // unimplemented
+  }
+
+  // TODO below was taken from LeveldbRMStateStore, it can probably be
+  // refactored
+  private void startCompactionTimer() {
+    if (compactionIntervalMsec > 0) {
+      compactionTimer = new Timer(
+          this.getClass().getSimpleName() + " compaction timer", true);
+      compactionTimer.schedule(new CompactionTimerTask(),
+          compactionIntervalMsec, compactionIntervalMsec);
+    }
+  }
+
+  private class CompactionTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      LOG.info("Starting full compaction cycle");
+      try {
+        db.compactRange(null, null);
+      } catch (DBException e) {
+        LOG.error("Error compacting database", e);
+      }
+      long duration = Time.monotonicNow() - start;
+      LOG.info("Full compaction cycle completed in " + duration + " msec");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 670c0f9..9ccc146 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
 import com.google.common.base.Joiner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +51,9 @@ import java.util.Map;
 public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     MutableConfigurationProvider {
 
+  public static final Log LOG =
+      LogFactory.getLog(MutableCSConfigurationProvider.class);
+
   private Configuration schedConf;
   private YarnConfigurationStore confStore;
   private ConfigurationMutationACLPolicy aclMutationPolicy;
@@ -68,6 +73,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
       this.confStore = new InMemoryConfigurationStore();
       break;
+    case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
+      this.confStore = new LeveldbConfigurationStore();
+      break;
     default:
       this.confStore = YarnConfigurationStoreFactory.getStore(config);
       break;
@@ -82,6 +90,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
       schedConf.set(kv.getKey(), kv.getValue());
     }
     confStore.initialize(config, schedConf);
+    // After initializing confStore, the store may already have an existing
+    // configuration. Use this one.
+    schedConf = confStore.retrieve();
     this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
         .getPolicy(config);
     aclMutationPolicy.init(config, rmContext);
@@ -97,7 +108,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   }
 
   @Override
-  public void mutateConfiguration(UserGroupInformation user,
+  public synchronized void mutateConfiguration(UserGroupInformation user,
       SchedConfUpdateInfo confUpdate) throws IOException {
     if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
       throw new AccessControlException("User is not admin of all modified" +
@@ -124,6 +135,31 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     confStore.confirmMutation(id, true);
   }
 
+  @Override
+  public void recoverConf() throws IOException {
+    List<LogMutation> uncommittedLogs = confStore.getPendingMutations();
+    Configuration oldConf = new Configuration(schedConf);
+    for (LogMutation mutation : uncommittedLogs) {
+      for (Map.Entry<String, String> kv : mutation.getUpdates().entrySet()) {
+        if (kv.getValue() == null) {
+          schedConf.unset(kv.getKey());
+        } else {
+          schedConf.set(kv.getKey(), kv.getValue());
+        }
+      }
+      try {
+        rmContext.getScheduler().reinitialize(conf, rmContext);
+      } catch (IOException e) {
+        schedConf = oldConf;
+        confStore.confirmMutation(mutation.getId(), false);
+        LOG.info("Configuration mutation " + mutation.getId()
+            + " was rejected", e);
+        continue;
+      }
+      confStore.confirmMutation(mutation.getId(), true);
+      LOG.info("Configuration mutation " + mutation.getId()+ " was accepted");
+    }
+  }
 
   private Map<String, String> constructKeyValueConfUpdate(
       SchedConfUpdateInfo mutationInfo) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 22c0ef8..065c877 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -43,7 +45,7 @@ public interface YarnConfigurationStore {
    * LogMutation encapsulates the fields needed for configuration mutation
    * audit logging and recovery.
    */
-  class LogMutation {
+  class LogMutation implements Serializable {
     private Map<String, String> updates;
     private String user;
     private long id;
@@ -106,16 +108,19 @@ public interface YarnConfigurationStore {
    * Initialize the configuration store.
    * @param conf configuration to initialize store with
    * @param schedConf Initial key-value configuration to persist
+   * @throws IOException if initialization fails
    */
-  void initialize(Configuration conf, Configuration schedConf);
+  void initialize(Configuration conf, Configuration schedConf)
+      throws IOException;
 
   /**
    * Logs the configuration change to backing store. Generates an id associated
    * with this mutation, sets it in {@code logMutation}, and returns it.
    * @param logMutation configuration change to be persisted in write ahead log
    * @return id which configuration store associates with this mutation
+   * @throws IOException if logging fails
    */
-  long logMutation(LogMutation logMutation);
+  long logMutation(LogMutation logMutation) throws IOException;
 
   /**
    * Should be called after {@code logMutation}. Gets the pending mutation
@@ -130,8 +135,9 @@ public interface YarnConfigurationStore {
    * @param isValid if true, update persisted configuration with mutation
    *                associated with {@code id}.
    * @return true on success
+   * @throws IOException if mutation confirmation fails
    */
-  boolean confirmMutation(long id, boolean isValid);
+  boolean confirmMutation(long id, boolean isValid) throws IOException;
 
   /**
    * Retrieve the persisted configuration.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b06711cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
index dff4e77..631ce65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.Yar
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,7 +44,7 @@ public class TestYarnConfigurationStore {
   }
 
   @Test
-  public void testInMemoryConfigurationStore() {
+  public void testInMemoryConfigurationStore() throws IOException {
     confStore = new InMemoryConfigurationStore();
     confStore.initialize(new Configuration(), schedConf);
     assertEquals("val1", confStore.retrieve().get("key1"));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org