You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/02 21:21:17 UTC
[33/48] hadoop git commit: HDFS-10630. Federation State Store FS
Implementation. Contributed by Jason Kace and Inigo Goiri.
HDFS-10630. Federation State Store FS Implementation. Contributed by Jason Kace and Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9203e566
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9203e566
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9203e566
Branch: refs/heads/HDFS-10467
Commit: 9203e5666fb856f44ac85341791c24064a4a74a7
Parents: 190510f
Author: Inigo Goiri <in...@apache.org>
Authored: Tue May 2 15:49:53 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Sat Sep 2 14:20:08 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 +
.../federation/router/PeriodicService.java | 198 ++++++++
.../StateStoreConnectionMonitorService.java | 67 +++
.../federation/store/StateStoreService.java | 152 +++++-
.../federation/store/StateStoreUtils.java | 51 +-
.../store/driver/StateStoreDriver.java | 31 +-
.../driver/StateStoreRecordOperations.java | 17 +-
.../store/driver/impl/StateStoreBaseImpl.java | 31 +-
.../driver/impl/StateStoreFileBaseImpl.java | 429 ++++++++++++++++
.../store/driver/impl/StateStoreFileImpl.java | 161 +++++++
.../driver/impl/StateStoreFileSystemImpl.java | 178 +++++++
.../driver/impl/StateStoreSerializableImpl.java | 77 +++
.../federation/store/records/BaseRecord.java | 20 +-
.../server/federation/store/records/Query.java | 66 +++
.../src/main/resources/hdfs-default.xml | 16 +
.../store/FederationStateStoreTestUtils.java | 232 +++++++++
.../store/driver/TestStateStoreDriverBase.java | 483 +++++++++++++++++++
.../store/driver/TestStateStoreFile.java | 64 +++
.../store/driver/TestStateStoreFileSystem.java | 88 ++++
19 files changed, 2329 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 7623839..8cdd450 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -25,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
import org.apache.hadoop.http.HttpConfig;
@@ -1134,6 +1138,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT =
StateStoreSerializerPBImpl.class;
+ public static final String FEDERATION_STORE_DRIVER_CLASS =
+ FEDERATION_STORE_PREFIX + "driver.class";
+ public static final Class<? extends StateStoreDriver>
+ FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreFileImpl.class;
+
+ public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
+ FEDERATION_STORE_PREFIX + "connection.test";
+ public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT =
+ TimeUnit.MINUTES.toMillis(1);
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
new file mode 100644
index 0000000..5e12222
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Service to periodically execute a runnable.
+ */
+public abstract class PeriodicService extends AbstractService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PeriodicService.class);
+
+ /** Default interval in milliseconds for the periodic service. */
+ private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
+
+
+ /** Interval for running the periodic service in milliseconds. */
+ private long intervalMs;
+ /** Name of the service. */
+ private final String serviceName;
+
+ /** Scheduler for the periodic service. */
+ private final ScheduledExecutorService scheduler;
+
+ /** If the service is running. */
+ private volatile boolean isRunning = false;
+
+ /** How many times we run. */
+ private long runCount;
+ /** How many errors we got. */
+ private long errorCount;
+ /** When was the last time we executed this service successfully. */
+ private long lastRun;
+
+ /**
+ * Create a new periodic update service.
+ *
+ * @param name Name of the service.
+ */
+ public PeriodicService(String name) {
+ this(name, DEFAULT_INTERVAL_MS);
+ }
+
+ /**
+ * Create a new periodic update service.
+ *
+ * @param name Name of the service.
+ * @param interval Interval for the periodic service in milliseconds.
+ */
+ public PeriodicService(String name, long interval) {
+ super(name);
+ this.serviceName = name;
+ this.intervalMs = interval;
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(this.getName() + "-%d")
+ .build();
+ this.scheduler = Executors.newScheduledThreadPool(1, threadFactory);
+ }
+
+ /**
+ * Set the interval for the periodic service.
+ *
+ * @param interval Interval in milliseconds.
+ */
+ protected void setIntervalMs(long interval) {
+ if (getServiceState() == STATE.STARTED) {
+ throw new ServiceStateException("Periodic service already started");
+ } else {
+ this.intervalMs = interval;
+ }
+ }
+
+ /**
+ * Get the interval for the periodic service.
+ *
+ * @return Interval in milliseconds.
+ */
+ protected long getIntervalMs() {
+ return this.intervalMs;
+ }
+
+ /**
+ * Get how many times we failed to run the periodic service.
+ *
+ * @return Times we failed to run the periodic service.
+ */
+ protected long getErrorCount() {
+ return this.errorCount;
+ }
+
+ /**
+ * Get how many times we run the periodic service.
+ *
+ * @return Times we run the periodic service.
+ */
+ protected long getRunCount() {
+ return this.runCount;
+ }
+
+ /**
+ * Get the last time the periodic service was executed.
+ *
+ * @return Last time the periodic service was executed.
+ */
+ protected long getLastUpdate() {
+ return this.lastRun;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ LOG.info("Starting periodic service {}", this.serviceName);
+ startPeriodic();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopPeriodic();
+ LOG.info("Stopping periodic service {}", this.serviceName);
+ super.serviceStop();
+ }
+
+ /**
+ * Stop the periodic task.
+ */
+ protected synchronized void stopPeriodic() {
+ if (this.isRunning) {
+ LOG.info("{} is shutting down", this.serviceName);
+ this.isRunning = false;
+ this.scheduler.shutdownNow();
+ }
+ }
+
+ /**
+ * Start the periodic execution.
+ */
+ protected synchronized void startPeriodic() {
+ stopPeriodic();
+
+ // Create the runnable service
+ Runnable updateRunnable = new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("Running {} update task", serviceName);
+ try {
+ if (!isRunning) {
+ return;
+ }
+ periodicInvoke();
+ runCount++;
+ lastRun = Time.now();
+ } catch (Exception ex) {
+ errorCount++;
+ LOG.warn(serviceName + " service threw an exception", ex);
+ }
+ }
+ };
+
+ // Start the execution of the periodic service
+ this.isRunning = true;
+ this.scheduler.scheduleWithFixedDelay(
+ updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Method that the service will run periodically.
+ */
+ protected abstract void periodicInvoke();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
new file mode 100644
index 0000000..4d279c5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically monitor the connection of the StateStore
+ * {@link StateStoreService} data store and to re-open the connection
+ * to the data store if required.
+ */
+public class StateStoreConnectionMonitorService extends PeriodicService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreConnectionMonitorService.class);
+
+ /** Service that maintains the State Store connection. */
+ private final StateStoreService stateStore;
+
+
+ /**
+ * Create a new service to monitor the connectivity of the state store driver.
+ *
+ * @param store Instance of the state store to be monitored.
+ */
+ public StateStoreConnectionMonitorService(StateStoreService store) {
+ super(StateStoreConnectionMonitorService.class.getSimpleName());
+ this.stateStore = store;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.setIntervalMs(conf.getLong(
+ DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+ DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT));
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void periodicInvoke() {
+ LOG.debug("Checking state store connection");
+ if (!stateStore.isDriverReady()) {
+ LOG.info("Attempting to open state store driver.");
+ stateStore.loadDriver();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index 866daa3..df207e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -15,45 +15,168 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hdfs.server.federation.store;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A service to initialize a
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
- * StateStoreDriver} and maintain the connection to the data store. There
- * are multiple state store driver connections supported:
+ * StateStoreDriver} and maintain the connection to the data store. There are
+ * multiple state store driver connections supported:
* <ul>
- * <li>File {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
+ * <li>File
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
* StateStoreFileImpl StateStoreFileImpl}
- * <li>ZooKeeper {@link org.apache.hadoop.hdfs.server.federation.store.driver.
- * impl.StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
+ * <li>ZooKeeper
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
+ * StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
* </ul>
* <p>
- * The service also supports the dynamic registration of data interfaces such as
- * the following:
+ * The service also supports the dynamic registration of record stores like:
* <ul>
- * <li>{@link MembershipStateStore}: state of the Namenodes in the
+ * <li>{@link MembershipStore}: state of the Namenodes in the
* federation.
* <li>{@link MountTableStore}: Mount table between to subclusters.
* See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
- * <li>{@link RouterStateStore}: State of the routers in the federation.
* </ul>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class StateStoreService extends CompositeService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreService.class);
+
+
+ /** State Store configuration. */
+ private Configuration conf;
+
/** Identifier for the service. */
private String identifier;
- // Stub class
- public StateStoreService(String name) {
- super(name);
+ /** Driver for the back end connection. */
+ private StateStoreDriver driver;
+
+ /** Service to maintain data store connection. */
+ private StateStoreConnectionMonitorService monitorService;
+
+
+ public StateStoreService() {
+ super(StateStoreService.class.getName());
+ }
+
+ /**
+ * Initialize the State Store and the connection to the backend.
+ *
+ * @param config Configuration for the State Store.
+ * @throws IOException
+ */
+ @Override
+ protected void serviceInit(Configuration config) throws Exception {
+ this.conf = config;
+
+ // Create implementation of State Store
+ Class<? extends StateStoreDriver> driverClass = this.conf.getClass(
+ DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+ DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT,
+ StateStoreDriver.class);
+ this.driver = ReflectionUtils.newInstance(driverClass, this.conf);
+
+ if (this.driver == null) {
+ throw new IOException("Cannot create driver for the State Store");
+ }
+
+ // Check the connection to the State Store periodically
+ this.monitorService = new StateStoreConnectionMonitorService(this);
+ this.addService(monitorService);
+
+ super.serviceInit(this.conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ loadDriver();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ closeDriver();
+
+ super.serviceStop();
+ }
+
+ /**
+ * List of records supported by this State Store.
+ *
+ * @return List of supported record classes.
+ */
+ public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
+ // TODO add list of records
+ return new LinkedList<>();
+ }
+
+ /**
+ * Load the State Store driver. If successful, refresh cached data tables.
+ */
+ public void loadDriver() {
+ synchronized (this.driver) {
+ if (!isDriverReady()) {
+ String driverName = this.driver.getClass().getSimpleName();
+ if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
+ LOG.info("Connection to the State Store driver {} is open and ready",
+ driverName);
+ } else {
+ LOG.error("Cannot initialize State Store driver {}", driverName);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the driver is ready to be used.
+ *
+ * @return If the driver is ready.
+ */
+ public boolean isDriverReady() {
+ return this.driver.isDriverReady();
+ }
+
+ /**
+ * Manually shuts down the driver.
+ *
+ * @throws Exception If the driver cannot be closed.
+ */
+ @VisibleForTesting
+ public void closeDriver() throws Exception {
+ if (this.driver != null) {
+ this.driver.close();
+ }
+ }
+
+ /**
+ * Get the state store driver.
+ *
+ * @return State store driver.
+ */
+ public StateStoreDriver getDriver() {
+ return this.driver;
}
/**
@@ -74,4 +197,5 @@ public class StateStoreService extends CompositeService {
public void setIdentifier(String id) {
this.identifier = id;
}
-}
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
index 8c681df..0a36619 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
@@ -17,17 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.federation.store;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Set of utility functions used to query, create, update and delete data
- * records in the state store.
+ * Set of utility functions used to work with the State Store.
*/
public final class StateStoreUtils {
- private static final Log LOG = LogFactory.getLog(StateStoreUtils.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreUtils.class);
+
private StateStoreUtils() {
// Utility class
@@ -52,7 +57,7 @@ public final class StateStoreUtils {
// Check if we went too far
if (actualClazz.equals(BaseRecord.class)) {
- LOG.error("We went too far (" + actualClazz + ") with " + clazz);
+ LOG.error("We went too far ({}) with {}", actualClazz, clazz);
actualClazz = clazz;
}
return actualClazz;
@@ -69,4 +74,36 @@ public final class StateStoreUtils {
Class<? extends BaseRecord> getRecordClass(final T record) {
return getRecordClass(record.getClass());
}
-}
+
+ /**
+ * Get the base class name for a record. If we get an implementation of a
+ * record we will return the real parent record class.
+ *
+ * @param clazz Class of the data record to check.
+ * @return Name of the base class for the record.
+ */
+ public static <T extends BaseRecord> String getRecordName(
+ final Class<T> clazz) {
+ return getRecordClass(clazz).getSimpleName();
+ }
+
+ /**
+ * Filters a list of records to find all records matching the query.
+ *
+ * @param query Map of field names and objects to use to filter results.
+ * @param records List of data records to filter.
+ * @return List of all records matching the query (or empty list if none
+ * match), null if the data set could not be filtered.
+ */
+ public static <T extends BaseRecord> List<T> filterMultiple(
+ final Query<T> query, final Iterable<T> records) {
+
+ List<T> matchingList = new ArrayList<>();
+ for (T record : records) {
+ if (query.matches(record)) {
+ matchingList.add(record);
+ }
+ }
+ return matchingList;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
index a1527df..90111bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
@@ -18,15 +18,16 @@
package org.apache.hadoop.hdfs.server.federation.store.driver;
import java.net.InetAddress;
-import java.util.List;
+import java.util.Collection;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Driver class for an implementation of a {@link StateStoreService}
@@ -35,7 +36,8 @@ import org.apache.hadoop.util.Time;
*/
public abstract class StateStoreDriver implements StateStoreRecordOperations {
- private static final Log LOG = LogFactory.getLog(StateStoreDriver.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreDriver.class);
/** State Store configuration. */
@@ -47,13 +49,14 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
/**
* Initialize the state store connection.
+ *
* @param config Configuration for the driver.
* @param id Identifier for the driver.
* @param records Records that are supported.
* @return If initialized and ready, false if failed to initialize driver.
*/
public boolean init(final Configuration config, final String id,
- final List<Class<? extends BaseRecord>> records) {
+ final Collection<Class<? extends BaseRecord>> records) {
this.conf = config;
this.identifier = id;
@@ -62,8 +65,20 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
LOG.warn("The identifier for the State Store connection is not set");
}
- // TODO stub
- return false;
+ boolean success = initDriver();
+ if (!success) {
+ LOG.error("Cannot intialize driver for {}", getDriverName());
+ return false;
+ }
+
+ for (Class<? extends BaseRecord> cls : records) {
+ String recordString = StateStoreUtils.getRecordName(cls);
+ if (!initRecordStorage(recordString, cls)) {
+ LOG.error("Cannot initialize record store for {}", cls.getSimpleName());
+ return false;
+ }
+ }
+ return true;
}
/**
@@ -169,4 +184,4 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
}
return hostname;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
index 739eeba..e76a733 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.hdfs.server.federation.store.driver;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
@@ -67,14 +67,14 @@ public interface StateStoreRecordOperations {
* Get a single record from the store that matches the query.
*
* @param clazz Class of record to fetch.
- * @param query Map of field names and objects to filter results.
+ * @param query Query to filter results.
* @return A single record matching the query. Null if there are no matching
* records or more than one matching record in the store.
* @throws IOException If multiple records match or if the data store cannot
* be queried.
*/
@Idempotent
- <T extends BaseRecord> T get(Class<T> clazz, Map<String, String> query)
+ <T extends BaseRecord> T get(Class<T> clazz, Query<T> query)
throws IOException;
/**
@@ -83,14 +83,14 @@ public interface StateStoreRecordOperations {
* supports filtering it should overwrite this method.
*
* @param clazz Class of record to fetch.
- * @param query Map of field names and objects to filter results.
+ * @param query Query to filter results.
* @return Records of type clazz that match the query or empty list if none
* are found.
* @throws IOException Throws exception if unable to query the data store.
*/
@Idempotent
<T extends BaseRecord> List<T> getMultiple(
- Class<T> clazz, Map<String, String> query) throws IOException;
+ Class<T> clazz, Query<T> query) throws IOException;
/**
* Creates a single record. Optionally updates an existing record with same
@@ -152,13 +152,12 @@ public interface StateStoreRecordOperations {
* Remove multiple records of a specific class that match a query. Requires
* the getAll implementation to fetch fresh records on each call.
*
- * @param clazz Class of record to remove.
- * @param filter matching filter to remove.
+ * @param query Query to filter what to remove.
* @return The number of records removed.
* @throws IOException Throws exception if unable to query the data store.
*/
@AtMostOnce
- <T extends BaseRecord> int remove(Class<T> clazz, Map<String, String> filter)
+ <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
throws IOException;
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
index b711fa9..1bd35f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
@@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
/**
* Base implementation of a State Store driver. It contains default
@@ -41,7 +44,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
@Override
public <T extends BaseRecord> T get(
- Class<T> clazz, Map<String, String> query) throws IOException {
+ Class<T> clazz, Query<T> query) throws IOException {
List<T> records = getMultiple(clazz, query);
if (records.size() > 1) {
throw new IOException("Found more than one object in collection");
@@ -53,17 +56,31 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
}
@Override
+ public <T extends BaseRecord> List<T> getMultiple(
+ Class<T> clazz, Query<T> query) throws IOException {
+ QueryResult<T> result = get(clazz);
+ List<T> records = result.getRecords();
+ List<T> ret = filterMultiple(query, records);
+ if (ret == null) {
+ throw new IOException("Cannot fetch records from the store");
+ }
+ return ret;
+ }
+
+ @Override
public <T extends BaseRecord> boolean put(
T record, boolean allowUpdate, boolean errorIfExists) throws IOException {
- List<T> singletonList = new ArrayList<T>();
+ List<T> singletonList = new ArrayList<>();
singletonList.add(record);
return putAll(singletonList, allowUpdate, errorIfExists);
}
@Override
public <T extends BaseRecord> boolean remove(T record) throws IOException {
- Map<String, String> primaryKeys = record.getPrimaryKeys();
- Class<? extends BaseRecord> clazz = StateStoreUtils.getRecordClass(record);
- return remove(clazz, primaryKeys) == 1;
+ final Query<T> query = new Query<T>(record);
+ Class<? extends BaseRecord> clazz = record.getClass();
+ @SuppressWarnings("unchecked")
+ Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
+ return remove(recordClass, query) == 1;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
new file mode 100644
index 0000000..d7c00ff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StateStoreDriver} implementation based on a local file.
+ */
+public abstract class StateStoreFileBaseImpl
+ extends StateStoreSerializableImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
+
+ /** If it is initialized. */
+ private boolean initialized = false;
+
+ /** Name of the file containing the data. */
+ private static final String DATA_FILE_NAME = "records.data";
+
+
+ /**
+ * Lock reading records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> clazz);
+
+ /**
+ * Unlock reading records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract <T extends BaseRecord> void unlockRecordRead(
+ Class<T> clazz);
+
+ /**
+ * Lock writing records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract <T extends BaseRecord> void lockRecordWrite(
+ Class<T> clazz);
+
+ /**
+ * Unlock writing records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract <T extends BaseRecord> void unlockRecordWrite(
+ Class<T> clazz);
+
+ /**
+ * Get the reader for the file system.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract <T extends BaseRecord> BufferedReader getReader(
+ Class<T> clazz, String sub);
+
+ /**
+ * Get the writer for the file system.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract <T extends BaseRecord> BufferedWriter getWriter(
+ Class<T> clazz, String sub);
+
+ /**
+ * Check if a path exists.
+ *
+ * @param path Path to check.
+ * @return If the path exists.
+ */
+ protected abstract boolean exists(String path);
+
+ /**
+ * Make a directory.
+ *
+ * @param path Path of the directory to create.
+ * @return If the directory was created.
+ */
+ protected abstract boolean mkdir(String path);
+
+ /**
+ * Get root directory.
+ *
+ * @return Root directory.
+ */
+ protected abstract String getRootDir();
+
+ /**
+ * Set the driver as initialized.
+ *
+ * @param ini If the driver is initialized.
+ */
+ public void setInitialized(boolean ini) {
+ this.initialized = ini;
+ }
+
+ @Override
+ public boolean initDriver() {
+ String rootDir = getRootDir();
+ try {
+ if (rootDir == null) {
+ LOG.error("Invalid root directory, unable to initialize driver.");
+ return false;
+ }
+
+ // Check root path
+ if (!exists(rootDir)) {
+ if (!mkdir(rootDir)) {
+ LOG.error("Cannot create State Store root directory {}", rootDir);
+ return false;
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error(
+ "Cannot initialize filesystem using root directory {}", rootDir, ex);
+ return false;
+ }
+ setInitialized(true);
+ return true;
+ }
+
+ @Override
+ public <T extends BaseRecord> boolean initRecordStorage(
+ String className, Class<T> recordClass) {
+
+ String dataDirPath = getRootDir() + "/" + className;
+ try {
+ // Create data directories for files
+ if (!exists(dataDirPath)) {
+ LOG.info("{} data directory doesn't exist, creating it", dataDirPath);
+ if (!mkdir(dataDirPath)) {
+ LOG.error("Cannot create data directory {}", dataDirPath);
+ return false;
+ }
+ String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
+ if (!exists(dataFilePath)) {
+ // Create empty file
+ List<T> emtpyList = new ArrayList<>();
+ if(!writeAll(emtpyList, recordClass)) {
+ LOG.error("Cannot create data file {}", dataFilePath);
+ return false;
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Cannot create data directory {}", dataDirPath, ex);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Read all lines from a file and deserialize into the desired record type.
+ *
+ * @param reader Open handle for the file.
+ * @param recordClass Record class to create.
+ * @param includeDates True if dateModified/dateCreated are serialized.
+ * @return List of records.
+ * @throws IOException
+ */
+ private <T extends BaseRecord> List<T> getAllFile(
+ BufferedReader reader, Class<T> clazz, boolean includeDates)
+ throws IOException {
+
+ List<T> ret = new ArrayList<T>();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (!line.startsWith("#") && line.length() > 0) {
+ try {
+ T record = newRecord(line, clazz, includeDates);
+ ret.add(record);
+ } catch (Exception ex) {
+ LOG.error("Cannot parse line in data source file: {}", line, ex);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
+ throws IOException {
+ return get(clazz, (String)null);
+ }
+
+ @Override
+ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
+ throws IOException {
+ verifyDriverReady();
+ BufferedReader reader = null;
+ lockRecordRead(clazz);
+ try {
+ reader = getReader(clazz, sub);
+ List<T> data = getAllFile(reader, clazz, true);
+ return new QueryResult<T>(data, getTime());
+ } catch (Exception ex) {
+ LOG.error("Cannot fetch records {}", clazz.getSimpleName());
+ throw new IOException("Cannot read from data store " + ex.getMessage());
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.error("Failed closing file", e);
+ }
+ }
+ unlockRecordRead(clazz);
+ }
+ }
+
+ /**
+ * Overwrite the existing data with a new data set.
+ *
+ * @param list List of records to write.
+ * @param writer BufferedWriter stream to write to.
+ * @return If the records were succesfully written.
+ */
+ private <T extends BaseRecord> boolean writeAllFile(
+ Collection<T> records, BufferedWriter writer) {
+
+ try {
+ for (BaseRecord record : records) {
+ try {
+ String data = serializeString(record);
+ writer.write(data);
+ writer.newLine();
+ } catch (IllegalArgumentException ex) {
+ LOG.error("Cannot write record {} to file", record, ex);
+ }
+ }
+ writer.flush();
+ return true;
+ } catch (IOException e) {
+ LOG.error("Cannot commit records to file", e);
+ return false;
+ }
+ }
+
+ /**
+ * Overwrite the existing data with a new data set. Replaces all records in
+ * the data store for this record class. If all records in the data store are
+ * not successfully committed, this function must return false and leave the
+ * data store unchanged.
+ *
+ * @param records List of records to write. All records must be of type
+ * recordClass.
+ * @param recordClass Class of record to replace.
+ * @return true if all operations were successful, false otherwise.
+ * @throws StateStoreUnavailableException
+ */
+ public <T extends BaseRecord> boolean writeAll(
+ Collection<T> records, Class<T> recordClass)
+ throws StateStoreUnavailableException {
+ verifyDriverReady();
+ lockRecordWrite(recordClass);
+ BufferedWriter writer = null;
+ try {
+ writer = getWriter(recordClass, null);
+ return writeAllFile(records, writer);
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot add records to file for {}", recordClass.getSimpleName(), e);
+ return false;
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.error(
+ "Cannot close writer for {}", recordClass.getSimpleName(), e);
+ }
+ }
+ unlockRecordWrite(recordClass);
+ }
+ }
+
+ /**
+ * Get the data file name.
+ *
+ * @return Data file name.
+ */
+ protected String getDataFileName() {
+ return DATA_FILE_NAME;
+ }
+
+ @Override
+ public boolean isDriverReady() {
+ return this.initialized;
+ }
+
+ @Override
+ public <T extends BaseRecord> boolean putAll(
+ List<T> records, boolean allowUpdate, boolean errorIfExists)
+ throws StateStoreUnavailableException {
+ verifyDriverReady();
+
+ if (records.isEmpty()) {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass());
+ QueryResult<T> result;
+ try {
+ result = get(clazz);
+ } catch (IOException e) {
+ return false;
+ }
+ Map<Object, T> writeList = new HashMap<>();
+
+ // Write all of the existing records
+ for (T existingRecord : result.getRecords()) {
+ String key = existingRecord.getPrimaryKey();
+ writeList.put(key, existingRecord);
+ }
+
+ // Add inserts and updates, overwrite any existing values
+ for (T updatedRecord : records) {
+ try {
+ updatedRecord.validate();
+ String key = updatedRecord.getPrimaryKey();
+ if (writeList.containsKey(key) && allowUpdate) {
+ // Update
+ writeList.put(key, updatedRecord);
+ // Update the mod time stamp. Many backends will use their
+ // own timestamp for the mod time.
+ updatedRecord.setDateModified(this.getTime());
+ } else if (!writeList.containsKey(key)) {
+ // Insert
+ // Create/Mod timestamps are already initialized
+ writeList.put(key, updatedRecord);
+ } else if (errorIfExists) {
+ LOG.error("Attempt to insert record {} that already exists",
+ updatedRecord);
+ return false;
+ }
+ } catch (IllegalArgumentException ex) {
+ LOG.error("Cannot write invalid record to State Store", ex);
+ return false;
+ }
+ }
+
+ // Write all
+ boolean status = writeAll(writeList.values(), clazz);
+ return status;
+ }
+
+ @Override
+ public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
+ throws StateStoreUnavailableException {
+ verifyDriverReady();
+
+ if (query == null) {
+ return 0;
+ }
+
+ int removed = 0;
+ // Get the current records
+ try {
+ final QueryResult<T> result = get(clazz);
+ final List<T> existingRecords = result.getRecords();
+ // Write all of the existing records except those to be removed
+ final List<T> recordsToRemove = filterMultiple(query, existingRecords);
+ removed = recordsToRemove.size();
+ final List<T> newRecords = new LinkedList<>();
+ for (T record : existingRecords) {
+ if (!recordsToRemove.contains(record)) {
+ newRecords.add(record);
+ }
+ }
+ if (!writeAll(newRecords, clazz)) {
+ throw new IOException(
+ "Cannot remove record " + clazz + " query " + query);
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot remove records {} query {}", clazz, query, e);
+ }
+
+ return removed;
+ }
+
+ @Override
+ public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
+ throws StateStoreUnavailableException {
+ verifyDriverReady();
+ List<T> emptyList = new ArrayList<>();
+ boolean status = writeAll(emptyList, clazz);
+ return status;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
new file mode 100644
index 0000000..24e9660
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+/**
+ * StateStoreDriver implementation based on a local file.
+ */
+public class StateStoreFileImpl extends StateStoreFileBaseImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreFileImpl.class);
+
+ /** Configuration keys. */
+ public static final String FEDERATION_STORE_FILE_DIRECTORY =
+ DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
+
+ /** Synchronization. */
+ private static final ReadWriteLock READ_WRITE_LOCK =
+ new ReentrantReadWriteLock();
+
+ /** Root directory for the state store. */
+ private String rootDirectory;
+
+
+ @Override
+ protected boolean exists(String path) {
+ File test = new File(path);
+ return test.exists();
+ }
+
+ @Override
+ protected boolean mkdir(String path) {
+ File dir = new File(path);
+ return dir.mkdirs();
+ }
+
+ @Override
+ protected String getRootDir() {
+ if (this.rootDirectory == null) {
+ String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY);
+ if (dir == null) {
+ File tempDir = Files.createTempDir();
+ dir = tempDir.getAbsolutePath();
+ }
+ this.rootDirectory = dir;
+ }
+ return this.rootDirectory;
+ }
+
+ @Override
+ protected <T extends BaseRecord> void lockRecordWrite(Class<T> recordClass) {
+ // TODO - Synchronize via FS
+ READ_WRITE_LOCK.writeLock().lock();
+ }
+
+ @Override
+ protected <T extends BaseRecord> void unlockRecordWrite(
+ Class<T> recordClass) {
+ // TODO - Synchronize via FS
+ READ_WRITE_LOCK.writeLock().unlock();
+ }
+
+ @Override
+ protected <T extends BaseRecord> void lockRecordRead(Class<T> recordClass) {
+ // TODO - Synchronize via FS
+ READ_WRITE_LOCK.readLock().lock();
+ }
+
+ @Override
+ protected <T extends BaseRecord> void unlockRecordRead(Class<T> recordClass) {
+ // TODO - Synchronize via FS
+ READ_WRITE_LOCK.readLock().unlock();
+ }
+
+ @Override
+ protected <T extends BaseRecord> BufferedReader getReader(
+ Class<T> clazz, String sub) {
+ String filename = StateStoreUtils.getRecordName(clazz);
+ if (sub != null && sub.length() > 0) {
+ filename += "/" + sub;
+ }
+ filename += "/" + getDataFileName();
+
+ try {
+ LOG.debug("Loading file: {}", filename);
+ File file = new File(getRootDir(), filename);
+ FileInputStream fis = new FileInputStream(file);
+ InputStreamReader isr =
+ new InputStreamReader(fis, StandardCharsets.UTF_8);
+ BufferedReader reader = new BufferedReader(isr);
+ return reader;
+ } catch (Exception ex) {
+ LOG.error(
+ "Cannot open read stream for record {}", clazz.getSimpleName(), ex);
+ return null;
+ }
+ }
+
+ @Override
+ protected <T extends BaseRecord> BufferedWriter getWriter(
+ Class<T> clazz, String sub) {
+ String filename = StateStoreUtils.getRecordName(clazz);
+ if (sub != null && sub.length() > 0) {
+ filename += "/" + sub;
+ }
+ filename += "/" + getDataFileName();
+
+ try {
+ File file = new File(getRootDir(), filename);
+ FileOutputStream fos = new FileOutputStream(file, false);
+ OutputStreamWriter osw =
+ new OutputStreamWriter(fos, StandardCharsets.UTF_8);
+ BufferedWriter writer = new BufferedWriter(osw);
+ return writer;
+ } catch (IOException ex) {
+ LOG.error(
+ "Cannot open read stream for record {}", clazz.getSimpleName(), ex);
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ setInitialized(false);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
new file mode 100644
index 0000000..5968421
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StateStoreDriver} implementation based on a filesystem. The most common uses
+ * HDFS as a backend.
+ */
+public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreFileSystemImpl.class);
+
+
+ /** Configuration keys. */
+ public static final String FEDERATION_STORE_FS_PATH =
+ DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path";
+
+ /** File system to back the State Store. */
+ private FileSystem fs;
+ /** Working path in the filesystem. */
+ private String workPath;
+
+ @Override
+ protected boolean exists(String path) {
+ try {
+ return fs.exists(new Path(path));
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ protected boolean mkdir(String path) {
+ try {
+ return fs.mkdirs(new Path(path));
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ protected String getRootDir() {
+ if (this.workPath == null) {
+ String rootPath = getConf().get(FEDERATION_STORE_FS_PATH);
+ URI workUri;
+ try {
+ workUri = new URI(rootPath);
+ fs = FileSystem.get(workUri, getConf());
+ } catch (Exception ex) {
+ return null;
+ }
+ this.workPath = rootPath;
+ }
+ return this.workPath;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+
+ /**
+ * Get the folder path for the record class' data.
+ *
+ * @param cls Data record class.
+ * @return Path of the folder containing the record class' data files.
+ */
+ private Path getPathForClass(Class<? extends BaseRecord> clazz) {
+ if (clazz == null) {
+ return null;
+ }
+ // TODO extract table name from class: entry.getTableName()
+ String className = StateStoreUtils.getRecordName(clazz);
+ return new Path(workPath, className);
+ }
+
+ @Override
+ protected <T extends BaseRecord> void lockRecordRead(Class<T> clazz) {
+ // Not required, synced with HDFS leasing
+ }
+
+ @Override
+ protected <T extends BaseRecord> void unlockRecordRead(Class<T> clazz) {
+ // Not required, synced with HDFS leasing
+ }
+
+ @Override
+ protected <T extends BaseRecord> void lockRecordWrite(Class<T> clazz) {
+ // TODO -> wait for lease to be available
+ }
+
+ @Override
+ protected <T extends BaseRecord> void unlockRecordWrite(Class<T> clazz) {
+ // TODO -> ensure lease is closed for the file
+ }
+
+ @Override
+ protected <T extends BaseRecord> BufferedReader getReader(
+ Class<T> clazz, String sub) {
+
+ Path path = getPathForClass(clazz);
+ if (sub != null && sub.length() > 0) {
+ path = Path.mergePaths(path, new Path("/" + sub));
+ }
+ path = Path.mergePaths(path, new Path("/" + getDataFileName()));
+
+ try {
+ FSDataInputStream fdis = fs.open(path);
+ InputStreamReader isr =
+ new InputStreamReader(fdis, StandardCharsets.UTF_8);
+ BufferedReader reader = new BufferedReader(isr);
+ return reader;
+ } catch (IOException ex) {
+ LOG.error("Cannot open write stream for {} to {}",
+ clazz.getSimpleName(), path);
+ return null;
+ }
+ }
+
+ @Override
+ protected <T extends BaseRecord> BufferedWriter getWriter(
+ Class<T> clazz, String sub) {
+
+ Path path = getPathForClass(clazz);
+ if (sub != null && sub.length() > 0) {
+ path = Path.mergePaths(path, new Path("/" + sub));
+ }
+ path = Path.mergePaths(path, new Path("/" + getDataFileName()));
+
+ try {
+ FSDataOutputStream fdos = fs.create(path, true);
+ OutputStreamWriter osw =
+ new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
+ BufferedWriter writer = new BufferedWriter(osw);
+ return writer;
+ } catch (IOException ex) {
+ LOG.error("Cannot open write stream for {} to {}",
+ clazz.getSimpleName(), path);
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
new file mode 100644
index 0000000..e9b3fdf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+
+/**
+ * State Store driver that stores a serialization of the records. The serializer
+ * is pluggable.
+ */
+public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
+
+ /** Default serializer for this driver. */
+ private StateStoreSerializer serializer;
+
+
+ @Override
+ public boolean init(final Configuration config, final String id,
+ final Collection<Class<? extends BaseRecord>> records) {
+ boolean ret = super.init(config, id, records);
+
+ this.serializer = StateStoreSerializer.getSerializer(config);
+
+ return ret;
+ }
+
+ /**
+ * Serialize a record using the serializer.
+ * @param record Record to serialize.
+ * @return Byte array with the serialization of the record.
+ */
+ protected <T extends BaseRecord> byte[] serialize(T record) {
+ return serializer.serialize(record);
+ }
+
+ /**
+ * Serialize a record using the serializer.
+ * @param record Record to serialize.
+ * @return String with the serialization of the record.
+ */
+ protected <T extends BaseRecord> String serializeString(T record) {
+ return serializer.serializeString(record);
+ }
+
+ /**
+ * Creates a record from an input data string.
+ * @param data Serialized text of the record.
+ * @param clazz Record class.
+ * @param includeDates If dateModified and dateCreated are serialized.
+ * @return The created record.
+ * @throws IOException
+ */
+ protected <T extends BaseRecord> T newRecord(
+ String data, Class<T> clazz, boolean includeDates) throws IOException {
+ return serializer.deserialize(data, clazz);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
index 4192a3d..79f99c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
@@ -123,6 +123,24 @@ public abstract class BaseRecord implements Comparable<BaseRecord> {
}
/**
+ * Check if this record matches a partial record.
+ *
+ * @param other Partial record.
+ * @return If this record matches.
+ */
+ public boolean like(BaseRecord other) {
+ if (other == null) {
+ return false;
+ }
+ Map<String, String> thisKeys = this.getPrimaryKeys();
+ Map<String, String> otherKeys = other.getPrimaryKeys();
+ if (thisKeys == null) {
+ return otherKeys == null;
+ }
+ return thisKeys.equals(otherKeys);
+ }
+
+ /**
* Override equals check to use primary key(s) for comparison.
*/
@Override
@@ -186,4 +204,4 @@ public abstract class BaseRecord implements Comparable<BaseRecord> {
public String toString() {
return getPrimaryKey();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java
new file mode 100644
index 0000000..3c59abf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records;
+
+/**
+ * Check if a record matches a query. The query is usually a partial record.
+ *
+ * @param <T> Type of the record to query.
+ */
+public class Query<T extends BaseRecord> {
+
+ /** Partial object to compare against. */
+ private final T partial;
+
+
+ /**
+ * Create a query to search for a partial record.
+ *
+ * @param partial It defines the attributes to search.
+ */
+ public Query(final T part) {
+ this.partial = part;
+ }
+
+ /**
+ * Get the partial record used to query.
+ *
+ * @return The partial record used for the query.
+ */
+ public T getPartial() {
+ return this.partial;
+ }
+
+ /**
+ * Check if a record matches the primary keys or the partial record.
+ *
+ * @param other Record to check.
+ * @return If the record matches. Don't match if there is no partial.
+ */
+ public boolean matches(T other) {
+ if (this.partial == null) {
+ return false;
+ }
+ return this.partial.like(other);
+ }
+
+ @Override
+ public String toString() {
+ return "Checking: " + this.partial;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index f1dce6b..a8410b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4643,4 +4643,20 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.store.driver.class</name>
+ <value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl</value>
+ <description>
+ Class to implement the State Store. By default it uses the local disk.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.store.connection.test</name>
+ <value>60000</value>
+ <description>
+ How often to check for the connection to the State Store in milliseconds.
+ </description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203e566/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
new file mode 100644
index 0000000..fc5aebd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS;
+import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Utilities to test the State Store.
+ */
+public final class FederationStateStoreTestUtils {
+
+ private FederationStateStoreTestUtils() {
+ // Utility Class
+ }
+
+ /**
+ * Get the default State Store driver implementation.
+ *
+ * @return Class of the default State Store driver implementation.
+ */
+ public static Class<? extends StateStoreDriver> getDefaultDriver() {
+ return DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT;
+ }
+
+ /**
+ * Create a default State Store configuration.
+ *
+ * @return State Store configuration.
+ */
+ public static Configuration getStateStoreConfiguration() {
+ Class<? extends StateStoreDriver> clazz = getDefaultDriver();
+ return getStateStoreConfiguration(clazz);
+ }
+
+ /**
+ * Create a new State Store configuration for a particular driver.
+ *
+ * @param clazz Class of the driver to create.
+ * @return State Store configuration.
+ */
+ public static Configuration getStateStoreConfiguration(
+ Class<? extends StateStoreDriver> clazz) {
+ Configuration conf = new HdfsConfiguration(false);
+
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test");
+
+ conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class);
+
+ if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) {
+ setFileConfiguration(conf);
+ }
+ return conf;
+ }
+
+ /**
+ * Create a new State Store based on a configuration.
+ *
+ * @param configuration Configuration for the State Store.
+ * @return New State Store service.
+ * @throws IOException If it cannot create the State Store.
+ * @throws InterruptedException If we cannot wait for the store to start.
+ */
+ public static StateStoreService getStateStore(
+ Configuration configuration) throws IOException, InterruptedException {
+
+ StateStoreService stateStore = new StateStoreService();
+ assertNotNull(stateStore);
+
+ // Set unique identifier, this is normally the router address
+ String identifier = UUID.randomUUID().toString();
+ stateStore.setIdentifier(identifier);
+
+ stateStore.init(configuration);
+ stateStore.start();
+
+ // Wait for state store to connect
+ waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10));
+
+ return stateStore;
+ }
+
+ /**
+ * Wait for the State Store to initialize its driver.
+ *
+ * @param stateStore State Store.
+ * @param timeoutMs Time out in milliseconds.
+ * @throws IOException If the State Store cannot be reached.
+ * @throws InterruptedException If the sleep is interrupted.
+ */
+ public static void waitStateStore(StateStoreService stateStore,
+ long timeoutMs) throws IOException, InterruptedException {
+ long startingTime = Time.monotonicNow();
+ while (!stateStore.isDriverReady()) {
+ Thread.sleep(100);
+ if (Time.monotonicNow() - startingTime > timeoutMs) {
+ throw new IOException("Timeout waiting for State Store to connect");
+ }
+ }
+ }
+
+ /**
+ * Delete the default State Store.
+ *
+ * @throws IOException
+ */
+ public static void deleteStateStore() throws IOException {
+ Class<? extends StateStoreDriver> driverClass = getDefaultDriver();
+ deleteStateStore(driverClass);
+ }
+
+ /**
+ * Delete the State Store.
+ * @param driverClass Class of the State Store driver implementation.
+ * @throws IOException If it cannot be deleted.
+ */
+ public static void deleteStateStore(
+ Class<? extends StateStoreDriver> driverClass) throws IOException {
+
+ if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) {
+ String workingDirectory = System.getProperty("user.dir");
+ File dir = new File(workingDirectory + "/statestore");
+ if (dir.exists()) {
+ FileUtils.cleanDirectory(dir);
+ }
+ }
+ }
+
+ /**
+ * Set the default configuration for drivers based on files.
+ *
+ * @param conf Configuration to extend.
+ */
+ public static void setFileConfiguration(Configuration conf) {
+ String workingPath = System.getProperty("user.dir");
+ String stateStorePath = workingPath + "/statestore";
+ conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath);
+ }
+
+ /**
+ * Clear all the records from the State Store.
+ *
+ * @param store State Store to remove records from.
+ * @return If the State Store was cleared.
+ * @throws IOException If it cannot clear the State Store.
+ */
+ public static boolean clearAllRecords(StateStoreService store)
+ throws IOException {
+ Collection<Class<? extends BaseRecord>> allRecords =
+ store.getSupportedRecords();
+ for (Class<? extends BaseRecord> recordType : allRecords) {
+ if (!clearRecords(store, recordType)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Clear records from a certain type from the State Store.
+ *
+ * @param store State Store to remove records from.
+ * @param recordClass Class of the records to remove.
+ * @return If the State Store was cleared.
+ * @throws IOException If it cannot clear the State Store.
+ */
+ public static <T extends BaseRecord> boolean clearRecords(
+ StateStoreService store, Class<T> recordClass) throws IOException {
+ List<T> emptyList = new ArrayList<>();
+ if (!synchronizeRecords(store, emptyList, recordClass)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Synchronize a set of records. Remove all and keep the ones specified.
+ *
+ * @param stateStore State Store service managing the driver.
+ * @param records Records to add.
+ * @param clazz Class of the record to synchronize.
+ * @return If the synchronization succeeded.
+ * @throws IOException If it cannot connect to the State Store.
+ */
+ public static <T extends BaseRecord> boolean synchronizeRecords(
+ StateStoreService stateStore, List<T> records, Class<T> clazz)
+ throws IOException {
+ StateStoreDriver driver = stateStore.getDriver();
+ driver.verifyDriverReady();
+ if (driver.removeAll(clazz)) {
+ if (driver.putAll(records, true, false)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org