You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/09/22 18:27:16 UTC
[17/30] hadoop git commit: YARN-5946: Create YarnConfigurationStore
interface and InMemoryConfigurationStore class. Contributed by Jonathan Hung
YARN-5946: Create YarnConfigurationStore interface and
InMemoryConfigurationStore class. Contributed by Jonathan Hung
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/70275b53
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/70275b53
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/70275b53
Branch: refs/heads/YARN-5734
Commit: 70275b532c901a9b4b471f8c4efef62ba500a586
Parents: b53ac13
Author: Xuan <xg...@apache.org>
Authored: Fri Feb 24 15:58:12 2017 -0800
Committer: Jonathan Hung <jh...@linkedin.com>
Committed: Fri Sep 22 11:26:29 2017 -0700
----------------------------------------------------------------------
.../conf/InMemoryConfigurationStore.java | 86 +++++++++++
.../capacity/conf/YarnConfigurationStore.java | 154 +++++++++++++++++++
.../conf/TestYarnConfigurationStore.java | 70 +++++++++
3 files changed, 310 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70275b53/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
new file mode 100644
index 0000000..a208fb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A default implementation of {@link YarnConfigurationStore}. Doesn't offer
+ * persistent configuration storage, just stores the configuration in memory.
+ */
+public class InMemoryConfigurationStore implements YarnConfigurationStore {
+
+ private Configuration schedConf;
+ private LinkedList<LogMutation> pendingMutations;
+ private long pendingId;
+
+ @Override
+ public void initialize(Configuration conf, Configuration schedConf) {
+ this.schedConf = schedConf;
+ this.pendingMutations = new LinkedList<>();
+ this.pendingId = 0;
+ }
+
+ @Override
+ public synchronized long logMutation(LogMutation logMutation) {
+ logMutation.setId(++pendingId);
+ pendingMutations.add(logMutation);
+ return pendingId;
+ }
+
+ @Override
+ public synchronized boolean confirmMutation(long id, boolean isValid) {
+ LogMutation mutation = pendingMutations.poll();
+ // If confirmMutation is called out of order, discard mutations until id
+ // is reached.
+ while (mutation != null) {
+ if (mutation.getId() == id) {
+ if (isValid) {
+ Map<String, String> mutations = mutation.getUpdates();
+ for (Map.Entry<String, String> kv : mutations.entrySet()) {
+ schedConf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ return true;
+ }
+ mutation = pendingMutations.poll();
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized Configuration retrieve() {
+ return schedConf;
+ }
+
+ @Override
+ public synchronized List<LogMutation> getPendingMutations() {
+ return pendingMutations;
+ }
+
+ @Override
+ public List<LogMutation> getConfirmedConfHistory(long fromId) {
+ // Unimplemented.
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70275b53/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
new file mode 100644
index 0000000..22c0ef8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * YarnConfigurationStore exposes the methods needed for retrieving and
+ * persisting {@link CapacityScheduler} configuration via key-value
+ * using write-ahead logging. When configuration mutation is requested, caller
+ * should first log it with {@code logMutation}, which persists this pending
+ * mutation. This mutation is merged to the persisted configuration only after
+ * {@code confirmMutation} is called.
+ *
+ * On startup/recovery, caller should call {@code retrieve} to get all
+ * confirmed mutations, then get pending mutations which were not confirmed via
+ * {@code getPendingMutations}, and replay/confirm them via
+ * {@code confirmMutation} as in the normal case.
+ */
+public interface YarnConfigurationStore {
+
+ /**
+ * LogMutation encapsulates the fields needed for configuration mutation
+ * audit logging and recovery.
+ */
+ class LogMutation {
+ private Map<String, String> updates;
+ private String user;
+ private long id;
+
+ /**
+ * Create log mutation prior to logging.
+ * @param updates key-value configuration updates
+ * @param user user who requested configuration change
+ */
+ public LogMutation(Map<String, String> updates, String user) {
+ this(updates, user, 0);
+ }
+
+ /**
+ * Create log mutation for recovery.
+ * @param updates key-value configuration updates
+ * @param user user who requested configuration change
+ * @param id transaction id of configuration change
+ */
+ LogMutation(Map<String, String> updates, String user, long id) {
+ this.updates = updates;
+ this.user = user;
+ this.id = id;
+ }
+
+ /**
+ * Get key-value configuration updates.
+ * @return map of configuration updates
+ */
+ public Map<String, String> getUpdates() {
+ return updates;
+ }
+
+ /**
+ * Get user who requested configuration change.
+ * @return user who requested configuration change
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * Get transaction id of this configuration change.
+ * @return transaction id
+ */
+ public long getId() {
+ return id;
+ }
+
+ /**
+ * Set transaction id of this configuration change.
+ * @param id transaction id
+ */
+ public void setId(long id) {
+ this.id = id;
+ }
+ }
+
+ /**
+ * Initialize the configuration store.
+ * @param conf configuration to initialize store with
+ * @param schedConf Initial key-value configuration to persist
+ */
+ void initialize(Configuration conf, Configuration schedConf);
+
+ /**
+ * Logs the configuration change to backing store. Generates an id associated
+ * with this mutation, sets it in {@code logMutation}, and returns it.
+ * @param logMutation configuration change to be persisted in write ahead log
+ * @return id which configuration store associates with this mutation
+ */
+ long logMutation(LogMutation logMutation);
+
+ /**
+ * Should be called after {@code logMutation}. Gets the pending mutation
+ * associated with {@code id} and marks the mutation as persisted (no longer
+ * pending). If isValid is true, merge the mutation with the persisted
+ * configuration.
+ *
+ * If {@code confirmMutation} is called with ids in a different order than
+ * was returned by {@code logMutation}, the result is implementation
+ * dependent.
+ * @param id id of mutation to be confirmed
+ * @param isValid if true, update persisted configuration with mutation
+ * associated with {@code id}.
+ * @return true on success
+ */
+ boolean confirmMutation(long id, boolean isValid);
+
+ /**
+ * Retrieve the persisted configuration.
+ * @return configuration as key-value
+ */
+ Configuration retrieve();
+
+ /**
+ * Get the list of pending mutations, in the order they were logged.
+ * @return list of mutations
+ */
+ List<LogMutation> getPendingMutations();
+
+ /**
+ * Get a list of confirmed configuration mutations starting from a given id.
+ * @param fromId id from which to start getting mutations, inclusive
+ * @return list of configuration mutations
+ */
+ List<LogMutation> getConfirmedConfHistory(long fromId);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70275b53/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
new file mode 100644
index 0000000..dff4e77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestYarnConfigurationStore {
+
+ private YarnConfigurationStore confStore;
+ private Configuration schedConf;
+
+ private static final String testUser = "testUser";
+
+ @Before
+ public void setUp() {
+ schedConf = new Configuration(false);
+ schedConf.set("key1", "val1");
+ }
+
+ @Test
+ public void testInMemoryConfigurationStore() {
+ confStore = new InMemoryConfigurationStore();
+ confStore.initialize(new Configuration(), schedConf);
+ assertEquals("val1", confStore.retrieve().get("key1"));
+
+ Map<String, String> update1 = new HashMap<>();
+ update1.put("keyUpdate1", "valUpdate1");
+ LogMutation mutation1 = new LogMutation(update1, testUser);
+ long id = confStore.logMutation(mutation1);
+ assertEquals(1, confStore.getPendingMutations().size());
+ confStore.confirmMutation(id, true);
+ assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
+ assertEquals(0, confStore.getPendingMutations().size());
+
+ Map<String, String> update2 = new HashMap<>();
+ update2.put("keyUpdate2", "valUpdate2");
+ LogMutation mutation2 = new LogMutation(update2, testUser);
+ id = confStore.logMutation(mutation2);
+ assertEquals(1, confStore.getPendingMutations().size());
+ confStore.confirmMutation(id, false);
+ assertNull("Configuration should not be updated",
+ confStore.retrieve().get("keyUpdate2"));
+ assertEquals(0, confStore.getPendingMutations().size());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org