You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/02 21:21:15 UTC
[31/48] hadoop git commit: HDFS-10882. Federation State Store
Interface API. Contributed by Jason Kace and Inigo Goiri.
HDFS-10882. Federation State Store Interface API. Contributed by Jason Kace and Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/190510f3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/190510f3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/190510f3
Branch: refs/heads/HDFS-10467
Commit: 190510f323364e166a4d3ce61653782b96f32567
Parents: 9176c5a
Author: Inigo <in...@apache.org>
Authored: Thu Apr 6 19:18:52 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Sat Sep 2 14:20:08 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 ++
.../server/federation/store/RecordStore.java | 100 ++++++++++++++++
.../store/driver/StateStoreSerializer.java | 119 +++++++++++++++++++
.../driver/impl/StateStoreSerializerPBImpl.java | 115 ++++++++++++++++++
.../store/records/impl/pb/PBRecord.java | 47 ++++++++
.../store/records/impl/pb/package-info.java | 29 +++++
.../src/main/resources/hdfs-default.xml | 8 ++
7 files changed, 429 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/190510f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index ce0a17a..7623839 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
import org.apache.hadoop.http.HttpConfig;
/**
@@ -1123,6 +1124,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT =
"org.apache.hadoop.hdfs.server.federation.MockResolver";
+ // HDFS Router-based federation State Store
+ public static final String FEDERATION_STORE_PREFIX =
+ FEDERATION_ROUTER_PREFIX + "store.";
+
+ public static final String FEDERATION_STORE_SERIALIZER_CLASS =
+ DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer";
+ public static final Class<StateStoreSerializerPBImpl>
+ FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT =
+ StateStoreSerializerPBImpl.class;
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/190510f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java
new file mode 100644
index 0000000..524f432
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+
+/**
+ * Store records in the State Store. Subclasses provide interfaces to operate on
+ * those records.
+ *
+ * @param <R> Record to store by this interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RecordStore<R extends BaseRecord> {
+
+ private static final Log LOG = LogFactory.getLog(RecordStore.class);
+
+
+ /** Class of the record stored in this State Store. */
+ private final Class<R> recordClass;
+
+ /** State store driver backed by persistent storage. */
+ private final StateStoreDriver driver;
+
+
+ /**
+ * Create a new store for records.
+ *
+ * @param clazz Class of the record to store.
+ * @param stateStoreDriver Driver for the State Store.
+ */
+ protected RecordStore(Class<R> clazz, StateStoreDriver stateStoreDriver) {
+ this.recordClass = clazz;
+ this.driver = stateStoreDriver;
+ }
+
+ /**
+ * Report a required record to the data store. The data store uses this to
+ * create/maintain storage for the record.
+ *
+ * @return The class of the required record or null if no record is required
+ * for this interface.
+ */
+ public Class<R> getRecordClass() {
+ return this.recordClass;
+ }
+
+ /**
+ * Get the State Store driver.
+ *
+ * @return State Store driver.
+ */
+ protected StateStoreDriver getDriver() {
+ return this.driver;
+ }
+
+ /**
+ * Build a state store API implementation interface.
+ *
+ * @param interfaceClass The specific interface implementation to create
+ * @param driver The {@link StateStoreDriver} implementation in use.
+ * @return An initialized instance of the specified state store API
+ * implementation.
+ */
+ public static <T extends RecordStore<?>> T newInstance(
+ final Class<T> clazz, final StateStoreDriver driver) {
+
+ try {
+ Constructor<T> constructor = clazz.getConstructor(StateStoreDriver.class);
+ T recordStore = constructor.newInstance(driver);
+ return recordStore;
+ } catch (Exception e) {
+ LOG.error("Cannot create new instance for " + clazz, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/190510f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java
new file mode 100644
index 0000000..8540405
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Serializer to store and retrieve data in the State Store.
+ */
+public abstract class StateStoreSerializer {
+
+ /** Singleton for the serializer instance. */
+ private static StateStoreSerializer defaultSerializer;
+
+ /**
+ * Get the default serializer based.
+ * @return Singleton serializer.
+ */
+ public static StateStoreSerializer getSerializer() {
+ return getSerializer(null);
+ }
+
+ /**
+ * Get a serializer based on the provided configuration.
+ * @param conf Configuration. Default if null.
+ * @return Singleton serializer.
+ */
+ public static StateStoreSerializer getSerializer(Configuration conf) {
+ if (conf == null) {
+ synchronized (StateStoreSerializer.class) {
+ if (defaultSerializer == null) {
+ conf = new Configuration();
+ defaultSerializer = newSerializer(conf);
+ }
+ }
+ return defaultSerializer;
+ } else {
+ return newSerializer(conf);
+ }
+ }
+
+ private static StateStoreSerializer newSerializer(final Configuration conf) {
+ Class<? extends StateStoreSerializer> serializerName = conf.getClass(
+ DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS,
+ DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT,
+ StateStoreSerializer.class);
+ return ReflectionUtils.newInstance(serializerName, conf);
+ }
+
+ /**
+ * Create a new record.
+ * @param clazz Class of the new record.
+ * @return New record.
+ */
+ public static <T> T newRecord(Class<T> clazz) {
+ return getSerializer(null).newRecordInstance(clazz);
+ }
+
+ /**
+ * Create a new record.
+ * @param clazz Class of the new record.
+ * @return New record.
+ */
+ public abstract <T> T newRecordInstance(Class<T> clazz);
+
+ /**
+ * Serialize a record into a byte array.
+ * @param record Record to serialize.
+ * @return Byte array with the serialized record.
+ */
+ public abstract byte[] serialize(BaseRecord record);
+
+ /**
+ * Serialize a record into a string.
+ * @param record Record to serialize.
+ * @return String with the serialized record.
+ */
+ public abstract String serializeString(BaseRecord record);
+
+ /**
+ * Deserialize a bytes array into a record.
+ * @param byteArray Byte array to deserialize.
+ * @param clazz Class of the record.
+ * @return New record.
+ * @throws IOException If it cannot deserialize the record.
+ */
+ public abstract <T extends BaseRecord> T deserialize(
+ byte[] byteArray, Class<T> clazz) throws IOException;
+
+ /**
+ * Deserialize a string into a record.
+ * @param data String with the data to deserialize.
+ * @param clazz Class of the record.
+ * @return New record.
+ * @throws IOException If it cannot deserialize the record.
+ */
+ public abstract <T extends BaseRecord> T deserialize(
+ String data, Class<T> clazz) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/190510f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
new file mode 100644
index 0000000..45c5dd6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the State Store serializer.
+ */
+public final class StateStoreSerializerPBImpl extends StateStoreSerializer {
+
+ private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb";
+ private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl";
+
+ private Configuration localConf = new Configuration();
+
+
+ private StateStoreSerializerPBImpl() {
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> T newRecordInstance(Class<T> clazz) {
+ try {
+ String clazzPBImpl = getPBImplClassName(clazz);
+ Class<?> pbClazz = localConf.getClassByName(clazzPBImpl);
+ Object retObject = ReflectionUtils.newInstance(pbClazz, localConf);
+ return (T)retObject;
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getPBImplClassName(Class<?> clazz) {
+ String srcPackagePart = getPackageName(clazz);
+ String srcClassName = getClassName(clazz);
+ String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
+ String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
+ return destPackagePart + "." + destClassPart;
+ }
+
+ private String getClassName(Class<?> clazz) {
+ String fqName = clazz.getName();
+ return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
+ }
+
+ private String getPackageName(Class<?> clazz) {
+ return clazz.getPackage().getName();
+ }
+
+ @Override
+ public byte[] serialize(BaseRecord record) {
+ byte[] byteArray64 = null;
+ if (record instanceof PBRecord) {
+ PBRecord recordPB = (PBRecord) record;
+ Message msg = recordPB.getProto();
+ byte[] byteArray = msg.toByteArray();
+ byteArray64 = Base64.encodeBase64(byteArray, false);
+ }
+ return byteArray64;
+ }
+
+ @Override
+ public String serializeString(BaseRecord record) {
+ byte[] byteArray64 = serialize(record);
+ String base64Encoded = StringUtils.newStringUtf8(byteArray64);
+ return base64Encoded;
+ }
+
+ @Override
+ public <T extends BaseRecord> T deserialize(
+ byte[] byteArray, Class<T> clazz) throws IOException {
+
+ T record = newRecord(clazz);
+ if (record instanceof PBRecord) {
+ PBRecord pbRecord = (PBRecord) record;
+ byte[] byteArray64 = Base64.encodeBase64(byteArray, false);
+ String base64Encoded = StringUtils.newStringUtf8(byteArray64);
+ pbRecord.readInstance(base64Encoded);
+ }
+ return record;
+ }
+
+ @Override
+ public <T extends BaseRecord> T deserialize(String data, Class<T> clazz)
+ throws IOException {
+ byte[] byteArray64 = Base64.decodeBase64(data);
+ return deserialize(byteArray64, clazz);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/190510f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java
new file mode 100644
index 0000000..c369275
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb;
+
+import java.io.IOException;
+
+import com.google.protobuf.Message;
+
+/**
+ * A record implementation using Protobuf.
+ */
+public interface PBRecord {
+
+ /**
+ * Get the protocol for the record.
+ * @return The protocol for this record.
+ */
+ Message getProto();
+
+ /**
+ * Set the protocol for the record.
+ * @param proto Protocol for this record.
+ */
+ void setProto(Message proto);
+
+ /**
+ * Populate this record with serialized data.
+ * @param base64String Serialized data in base64.
+ * @throws IOException If it cannot read the data.
+ */
+ void readInstance(String base64String) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/190510f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java
new file mode 100644
index 0000000..b329732
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The protobuf implementations of state store data records defined in the
+ * org.apache.hadoop.hdfs.server.federation.store.records package. Each
+ * implementation wraps an associated protobuf proto definition.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/190510f3/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 6e31388..f1dce6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4635,4 +4635,12 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.store.serializer</name>
+ <value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl</value>
+ <description>
+ Class to serialize State Store records.
+ </description>
+ </property>
+
</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org