You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/03/22 03:42:14 UTC
[45/50] [abbrv] hbase git commit: HBASE-14030 HBase Backup/Restore
Phase 1 (v42)
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
new file mode 100644
index 0000000..14235ce
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupContext;
+import org.apache.hadoop.hbase.backup.impl.BackupCopyService;
+import org.apache.hadoop.hbase.backup.impl.BackupHandler;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+/**
+ * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot,
+ * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper
+ * implementation. The other is copying for incremental log files, which bases on extending
+ * DistCp's function with copy progress reporting to ZooKeeper implementation.
+ *
+ * For now this is only a wrapper. The other features such as progress and increment backup will be
+ * implemented in future jira
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceBackupCopyService implements BackupCopyService {
+ private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class);
+
+ private Configuration conf;
+ // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
+
+ // Accumulated progress within the whole backup process for the copy operation
+ private float progressDone = 0.1f;
+ private long bytesCopied = 0;
+ private static float INIT_PROGRESS = 0.1f;
+
+ // The percentage of the current copy task within the whole task if multiple time copies are
+ // needed. The default value is 100%, which means only 1 copy task for the whole.
+ private float subTaskPercntgInWholeTask = 1f;
+
+ public MapReduceBackupCopyService() {
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get the current copy task percentage within the whole task if multiple copies are needed.
+ * @return the current copy task percentage
+ */
+ public float getSubTaskPercntgInWholeTask() {
+ return subTaskPercntgInWholeTask;
+ }
+
+ /**
+ * Set the current copy task percentage within the whole task if multiple copies are needed. Must
+ * be called before calling
+ * {@link #copy(BackupHandler, Configuration, Type, String[])}
+ * @param subTaskPercntgInWholeTask The percentage of the copy subtask
+ */
+ public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
+ this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
+ }
+
+ class SnapshotCopy extends ExportSnapshot {
+ private BackupContext backupContext;
+ private TableName table;
+
+ public SnapshotCopy(BackupContext backupContext, TableName table) {
+ super();
+ this.backupContext = backupContext;
+ this.table = table;
+ }
+
+ public TableName getTable() {
+ return this.table;
+ }
+ }
+
+ // Extends DistCp for progress updating to hbase:backup
+ // during backup. Using DistCpV2 (MAPREDUCE-2765).
+ // Simply extend it and override execute() method to get the
+ // Job reference for progress updating.
+ // Only the argument "src1, [src2, [...]] dst" is supported,
+ // no more DistCp options.
+ class BackupDistCp extends DistCp {
+
+ private BackupContext backupContext;
+ private BackupManager backupManager;
+
+ public BackupDistCp(Configuration conf, DistCpOptions options, BackupContext backupContext,
+ BackupManager backupManager)
+ throws Exception {
+ super(conf, options);
+ this.backupContext = backupContext;
+ this.backupManager = backupManager;
+ }
+
+ @Override
+ public Job execute() throws Exception {
+
+ // reflection preparation for private methods and fields
+ Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
+ Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
+ Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
+ Method methodCreateInputFileListing =
+ classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
+ Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
+
+ Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
+ Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
+ Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+ Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
+
+ methodCreateMetaFolderPath.setAccessible(true);
+ methodCreateJob.setAccessible(true);
+ methodCreateInputFileListing.setAccessible(true);
+ methodCleanup.setAccessible(true);
+
+ fieldInputOptions.setAccessible(true);
+ fieldMetaFolder.setAccessible(true);
+ fieldJobFS.setAccessible(true);
+ fieldSubmitted.setAccessible(true);
+
+ // execute() logic starts here
+ assert fieldInputOptions.get(this) != null;
+ assert getConf() != null;
+
+ Job job = null;
+ try {
+ synchronized (this) {
+ // Don't cleanup while we are setting up.
+ fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
+ fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf()));
+
+ job = (Job) methodCreateJob.invoke(this);
+ }
+ methodCreateInputFileListing.invoke(this, job);
+
+ // Get the total length of the source files
+ List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
+ long totalSrcLgth = 0;
+ for (Path aSrc : srcs) {
+ totalSrcLgth += BackupUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
+ }
+
+ // submit the copy job
+ job.submit();
+ fieldSubmitted.set(this, true);
+
+ // after submit the MR job, set its handler in backup handler for cancel process
+ // this.backupHandler.copyJob = job;
+
+ // Update the copy progress to ZK every 0.5s if progress value changed
+ int progressReportFreq =
+ this.getConf().getInt("hbase.backup.progressreport.frequency", 500);
+ float lastProgress = progressDone;
+ while (!job.isComplete()) {
+ float newProgress =
+ progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+
+ if (newProgress > lastProgress) {
+
+ BigDecimal progressData =
+ new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+ String newProgressStr = progressData + "%";
+ LOG.info("Progress: " + newProgressStr);
+ BackupHandler.updateProgress(backupContext, backupManager, progressData.intValue(),
+ bytesCopied);
+ LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+ + ".\"");
+ lastProgress = newProgress;
+ }
+ Thread.sleep(progressReportFreq);
+ }
+
+ // update the progress data after copy job complete
+ float newProgress =
+ progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+ BigDecimal progressData =
+ new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+
+ String newProgressStr = progressData + "%";
+ LOG.info("Progress: " + newProgressStr);
+
+ // accumulate the overall backup progress
+ progressDone = newProgress;
+ bytesCopied += totalSrcLgth;
+
+ BackupHandler.updateProgress(backupContext, backupManager, progressData.intValue(),
+ bytesCopied);
+ LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+ + " - " + bytesCopied + " bytes copied.\"");
+
+ } finally {
+ if (!fieldSubmitted.getBoolean(this)) {
+ methodCleanup.invoke(this);
+ }
+ }
+
+ String jobID = job.getJobID().toString();
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+ LOG.debug("DistCp job-id: " + jobID);
+ return job;
+ }
+
+ }
+
+ /**
+ * Do backup copy based on different types.
+ * @param context The backup context
+ * @param conf The hadoop configuration
+ * @param copyType The backup copy type
+ * @param options Options for customized ExportSnapshot or DistCp
+ * @throws Exception exception
+ */
+ @Override
+ public int copy(BackupContext context, BackupManager backupManager, Configuration conf,
+ BackupCopyService.Type copyType, String[] options) throws IOException {
+ int res = 0;
+
+ try {
+ if (copyType == Type.FULL) {
+ SnapshotCopy snapshotCp =
+ new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
+ LOG.debug("Doing SNAPSHOT_COPY");
+ // Make a new instance of conf to be used by the snapshot copy class.
+ snapshotCp.setConf(new Configuration(conf));
+ res = snapshotCp.run(options);
+ } else if (copyType == Type.INCREMENTAL) {
+ LOG.debug("Doing COPY_TYPE_DISTCP");
+ setSubTaskPercntgInWholeTask(1f);
+
+ BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context,
+ backupManager);
+ // Handle a special case where the source file is a single file.
+ // In this case, distcp will not create the target dir. It just take the
+ // target as a file name and copy source file to the target (as a file name).
+ // We need to create the target dir before run distcp.
+ LOG.debug("DistCp options: " + Arrays.toString(options));
+ if (options.length == 2) {
+ Path dest = new Path(options[1]);
+ FileSystem destfs = dest.getFileSystem(conf);
+ if (!destfs.exists(dest)) {
+ destfs.mkdirs(dest);
+ }
+ }
+
+ res = distcp.run(options);
+ }
+ return res;
+
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
new file mode 100644
index 0000000..203c9a3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupUtil;
+import org.apache.hadoop.hbase.backup.impl.IncrementalRestoreService;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceRestoreService implements IncrementalRestoreService {
+ public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
+
+ private WALPlayer player;
+
+ public MapReduceRestoreService() {
+ this.player = new WALPlayer();
+ }
+
+ @Override
+ public void run(String logDir, TableName[] tableNames, TableName[] newTableNames) throws IOException {
+ String tableStr = BackupUtil.join(tableNames);
+ String newTableStr = BackupUtil.join(newTableNames);
+
+ // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each
+ // log file
+
+ String[] playerArgs = { logDir, tableStr, newTableStr };
+ LOG.info("Restore incremental backup from directory " + logDir + " from hbase tables "
+ + BackupUtil.join(tableNames) + " to tables "
+ + BackupUtil.join(newTableNames));
+ try {
+ player.run(playerArgs);
+ } catch (Exception e) {
+ throw new IOException("cannot restore from backup directory " + logDir
+ + " (check Hadoop and HBase logs) " + e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return player.getConf();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.player.setConf(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
new file mode 100644
index 0000000..dae24a6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -0,0 +1,119 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.master;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for
+ * incremental backup before deleting it when its TTL is over.
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupLogCleaner extends BaseLogCleanerDelegate {
+ private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class);
+
+ private boolean stopped = false;
+
+ public BackupLogCleaner() {
+ }
+
+ @Override
+ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+ // all members of this class are null if backup is disabled,
+ // so we cannot filter the files
+ if (this.getConf() == null) {
+ return files;
+ }
+
+ List<FileStatus> list = new ArrayList<FileStatus>();
+ // TODO: LogCleaners do not have a way to get the Connection from Master. We should find a
+ // way to pass it down here, so that this connection is not re-created every time.
+ // It is expensive
+ try(Connection connection = ConnectionFactory.createConnection(this.getConf());
+ final BackupSystemTable table = new BackupSystemTable(connection)) {
+
+ // If we do not have recorded backup sessions
+ if (!table.hasBackupSessions()) {
+ return files;
+ }
+
+ for(FileStatus file: files){
+ String wal = file.getPath().toString();
+ boolean logInSystemTable = table.checkWALFile(wal);
+ if(LOG.isDebugEnabled()) {
+ if(logInSystemTable) {
+ LOG.debug("Found log file in hbase:backup, deleting: " + wal);
+ list.add(file);
+ } else {
+ LOG.debug("Didn't find this log in hbase:backup, keeping: " + wal);
+ }
+ }
+ }
+ return list;
+ } catch (IOException e) {
+ LOG.error("Failed to get hbase:backup table, therefore will keep all files", e);
+ // nothing to delete
+ return new ArrayList<FileStatus>();
+ }
+ }
+
+ @Override
+ public void setConf(Configuration config) {
+ // If backup is disabled, keep all members null
+ if (!config.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) {
+ LOG.warn("Backup is disabled - allowing all wals to be deleted");
+ return;
+ }
+ super.setConf(config);
+ }
+
+ @Override
+ public void stop(String why) {
+ if (this.stopped) {
+ return;
+ }
+ this.stopped = true;
+ LOG.info("Stopping BackupLogCleaner");
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
new file mode 100644
index 0000000..f96682f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.Procedure;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.zookeeper.KeeperException;
+
+public class LogRollMasterProcedureManager extends MasterProcedureManager {
+
+ public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
+ public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
+ private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
+
+ private MasterServices master;
+ private ProcedureCoordinator coordinator;
+ private boolean done;
+
+ @Override
+ public void stop(String why) {
+ LOG.info("stop: " + why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public void initialize(MasterServices master, MetricsMaster metricsMaster)
+ throws KeeperException, IOException, UnsupportedOperationException {
+ this.master = master;
+ this.done = false;
+
+ // setup the default procedure coordinator
+ String name = master.getServerName().toString();
+ ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
+ BaseCoordinatedStateManager coordManager =
+ (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
+ .getCoordinatedStateManager(master.getConfiguration());
+ coordManager.initialize(master);
+
+ ProcedureCoordinatorRpcs comms =
+ coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
+
+ this.coordinator = new ProcedureCoordinator(comms, tpool);
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return ROLLLOG_PROCEDURE_SIGNATURE;
+ }
+
+ @Override
+ public void execProcedure(ProcedureDescription desc) throws IOException {
+ this.done = false;
+ // start the process on the RS
+ ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+ List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
+ List<String> servers = new ArrayList<String>();
+ for (ServerName sn : serverNames) {
+ servers.add(sn.toString());
+ }
+ Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
+ if (proc == null) {
+ String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ try {
+ // wait for the procedure to complete. A timer thread is kicked off that should cancel this
+ // if it takes too long.
+ proc.waitForCompleted();
+ LOG.info("Done waiting - exec procedure for " + desc.getInstance());
+ LOG.info("Distributed roll log procedure is successful!");
+ this.done = true;
+ } catch (InterruptedException e) {
+ ForeignException ee =
+ new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
+ monitor.receive(ee);
+ Thread.currentThread().interrupt();
+ } catch (ForeignException e) {
+ ForeignException ee =
+ new ForeignException("Exception while waiting for roll log procdure to finish", e);
+ monitor.receive(ee);
+ }
+ monitor.rethrowException();
+ }
+
+ @Override
+ public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+ return done;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
new file mode 100644
index 0000000..d524140
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+
+/**
+ * This backup subprocedure implementation forces a log roll on the RS.
+ */
+public class LogRollBackupSubprocedure extends Subprocedure {
+ private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
+
+ private final RegionServerServices rss;
+ private final LogRollBackupSubprocedurePool taskManager;
+ private FSHLog hlog;
+
+ public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
+ ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
+ LogRollBackupSubprocedurePool taskManager) {
+
+ super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
+ wakeFrequency, timeout);
+ LOG.info("Constructing a LogRollBackupSubprocedure.");
+ this.rss = rss;
+ this.taskManager = taskManager;
+ }
+
+ /**
+ * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
+ * with no use of subprocedurepool.
+ */
+ class RSRollLogTask implements Callable<Void> {
+ RSRollLogTask() {
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("++ DRPC started: " + rss.getServerName());
+ }
+ hlog = (FSHLog) rss.getWAL(null);
+ long filenum = hlog.getFilenum();
+
+ LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum);
+ hlog.rollWriter(true);
+ LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum());
+
+ Connection connection = rss.getConnection();
+ try(final BackupSystemTable table = new BackupSystemTable(connection)) {
+ // sanity check, good for testing
+ HashMap<String, Long> serverTimestampMap = table.readRegionServerLastLogRollResult();
+ String host = rss.getServerName().getHostname();
+ int port = rss.getServerName().getPort();
+ String server = host + ":" + port;
+ Long sts = serverTimestampMap.get(host);
+ if (sts != null && sts > filenum) {
+ LOG.warn("Won't update server's last roll log result: current="
+ + sts + " new=" + filenum);
+ return null;
+ }
+ // write the log number to hbase:backup.
+ table.writeRegionServerLastLogRollResult(server, filenum);
+ return null;
+ } catch (Exception e) {
+ LOG.error(e);
+ throw e; // TODO: is this correct?
+ }
+ }
+ }
+
+ private void rolllog() throws ForeignException {
+ monitor.rethrowException();
+
+ taskManager.submitTask(new RSRollLogTask());
+ monitor.rethrowException();
+
+ // wait for everything to complete.
+ taskManager.waitForOutstandingTasks();
+ monitor.rethrowException();
+
+ }
+
+ @Override
+ public void acquireBarrier() throws ForeignException {
+ // do nothing, executing in inside barrier step.
+ }
+
+ /**
+ * do a log roll.
+ * @return some bytes
+ */
+ @Override
+ public byte[] insideBarrier() throws ForeignException {
+ rolllog();
+ // FIXME
+ return null;
+ }
+
+ /**
+ * Cancel threads if they haven't finished.
+ */
+ @Override
+ public void cleanup(Exception e) {
+ taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
+ }
+
+ /**
+ * Hooray!
+ */
+ public void releaseBarrier() {
+ // NO OP
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
new file mode 100644
index 0000000..1ca638c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * Handle running each of the individual tasks for completing a backup procedure
+ * on a regionserver.
+ */
+public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
+ private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class);
+
+ /** Maximum number of concurrent snapshot region tasks that can run concurrently */
+ private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
+ private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
+
+ private final ExecutorCompletionService<Void> taskPool;
+ private final ThreadPoolExecutor executor;
+ private volatile boolean aborted;
+ private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+ private final String name;
+
+ public LogRollBackupSubprocedurePool(String name, Configuration conf) {
+ // configure the executor service
+ long keepAlive =
+ conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
+ LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
+ this.name = name;
+ executor =
+ new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name
+ + ")-backup-pool"));
+ taskPool = new ExecutorCompletionService<Void>(executor);
+ }
+
+ /**
+ * Submit a task to the pool.
+ */
+ public void submitTask(final Callable<Void> task) {
+ Future<Void> f = this.taskPool.submit(task);
+ futures.add(f);
+ }
+
+ /**
+ * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
+ * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+ * @throws ForeignException exception
+ */
+ public boolean waitForOutstandingTasks() throws ForeignException {
+ LOG.debug("Waiting for backup procedure to finish.");
+
+ try {
+ for (Future<Void> f : futures) {
+ f.get();
+ }
+ return true;
+ } catch (InterruptedException e) {
+ if (aborted) {
+ throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
+ e);
+ }
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ForeignException) {
+ throw (ForeignException) e.getCause();
+ }
+ throw new ForeignException(name, e.getCause());
+ } finally {
+ // close off remaining tasks
+ for (Future<Void> f : futures) {
+ if (!f.isDone()) {
+ f.cancel(true);
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
+ * finish
+ */
+ @Override
+ public void close() {
+ executor.shutdown();
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ if (this.aborted) {
+ return;
+ }
+
+ this.aborted = true;
+ LOG.warn("Aborting because: " + why, e);
+ this.executor.shutdownNow();
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
new file mode 100644
index 0000000..aca190c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * This manager class handles the work dealing with backup for a {@link HRegionServer}.
+ * <p>
+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
+ * responsible by this region server. If any failures occur with the subprocedure, the manager's
+ * procedure member notifies the procedure coordinator to abort all others.
+ * <p>
+ * On startup, requires {@link #start()} to be called.
+ * <p>
+ * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be
+ * called
+ */
+public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
+
+ private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class);
+
+ /** Conf key for number of request threads to start backup on regionservers */
+ public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
+ /** # of threads for backup work on the rs. */
+ public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
+
+ public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
+ public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+ /** Conf key for millis between checks to see if backup work completed or if there are errors */
+ public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
+ /** Default amount of time to check for errors while regions finish backup work */
+ private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+ private RegionServerServices rss;
+ private ProcedureMemberRpcs memberRpcs;
+ private ProcedureMember member;
+
+ /**
+ * Create a default backup procedure manager
+ */
+ public LogRollRegionServerProcedureManager() {
+ }
+
+ /**
+ * Start accepting backup procedure requests.
+ */
+ @Override
+ public void start() {
+ this.memberRpcs.start(rss.getServerName().toString(), member);
+ LOG.info("Started region server backup manager.");
+ }
+
+ /**
+ * Close <tt>this</tt> and all running backup procedure tasks
+ * @param force forcefully stop all running tasks
+ * @throws IOException exception
+ */
+ @Override
+ public void stop(boolean force) throws IOException {
+ String mode = force ? "abruptly" : "gracefully";
+ LOG.info("Stopping RegionServerBackupManager " + mode + ".");
+
+ try {
+ this.member.close();
+ } finally {
+ this.memberRpcs.close();
+ }
+ }
+
+ /**
+ * If in a running state, creates the specified subprocedure for handling a backup procedure.
+ * @return Subprocedure to submit to the ProcedureMemeber.
+ */
+ public Subprocedure buildSubprocedure() {
+
+ // don't run a backup if the parent is stop(ping)
+ if (rss.isStopping() || rss.isStopped()) {
+ throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
+ + ", because stopping/stopped!");
+ }
+
+ LOG.info("Attempting to run a roll log procedure for backup.");
+ ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+ Configuration conf = rss.getConfiguration();
+ long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ long wakeMillis =
+ conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
+
+ LogRollBackupSubprocedurePool taskManager =
+ new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
+ return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
+ taskManager);
+
+ }
+
+ /**
+ * Build the actual backup procedure runner that will do all the 'hard' work
+ */
+ public class BackupSubprocedureBuilder implements SubprocedureFactory {
+
+ @Override
+ public Subprocedure buildSubprocedure(String name, byte[] data) {
+ return LogRollRegionServerProcedureManager.this.buildSubprocedure();
+ }
+ }
+
+ @Override
+ public void initialize(RegionServerServices rss) throws IOException {
+ this.rss = rss;
+ BaseCoordinatedStateManager coordManager =
+ (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss
+ .getConfiguration());
+ coordManager.initialize(rss);
+ this.memberRpcs =
+ coordManager
+ .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+
+ // read in the backup handler configuration properties
+ Configuration conf = rss.getConfiguration();
+ long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
+ // create the actual cohort member
+ ThreadPoolExecutor pool =
+ ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
+ this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return "backup-proc";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index ae36f08..3342743 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Server;
@@ -51,8 +55,21 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
* Method to retrieve coordination for split log worker
*/
public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination();
+
/**
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
+ /**
+ * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
+ */
+ public abstract ProcedureCoordinatorRpcs
+ getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
+
+ /**
+ * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc}
+ */
+ public abstract ProcedureMemberRpcs
+ getProcedureMemberRpcs(String procType) throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 3e89be7..7cf4aab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,9 +17,15 @@
*/
package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@@ -49,9 +55,21 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
@Override
public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
return splitLogWorkerCoordination;
- }
+ }
+
@Override
public SplitLogManagerCoordination getSplitLogManagerCoordination() {
return splitLogManagerCoordination;
}
+
+ @Override
+ public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
+ throws IOException {
+ return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode);
+ }
+
+ @Override
+ public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException {
+ return new ZKProcedureMemberRpcs(watcher, procType);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 9d9cee0..2ceeda5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -85,6 +85,9 @@ public class WALPlayer extends Configured implements Tool {
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+ public WALPlayer(){
+ }
+
protected WALPlayer(final Configuration c) {
super(c);
}
@@ -94,7 +97,7 @@ public class WALPlayer extends Configured implements Tool {
* This one can be used together with {@link KeyValueSortReducer}
*/
static class WALKeyValueMapper
- extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
+ extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
private byte[] table;
@Override
@@ -106,7 +109,9 @@ public class WALPlayer extends Configured implements Tool {
if (Bytes.equals(table, key.getTablename().getName())) {
for (Cell cell : value.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- if (WALEdit.isMetaEditFamily(kv)) continue;
+ if (WALEdit.isMetaEditFamily(kv)) {
+ continue;
+ }
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv);
}
}
@@ -132,7 +137,7 @@ public class WALPlayer extends Configured implements Tool {
* a running HBase instance.
*/
protected static class WALMapper
- extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
+ extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>();
@Override
@@ -149,7 +154,9 @@ public class WALPlayer extends Configured implements Tool {
Cell lastCell = null;
for (Cell cell : value.getCells()) {
// filtering WAL meta entries
- if (WALEdit.isMetaEditFamily(cell)) continue;
+ if (WALEdit.isMetaEditFamily(cell)) {
+ continue;
+ }
// Allow a subclass filter out this cell.
if (filter(context, cell)) {
@@ -160,8 +167,12 @@ public class WALPlayer extends Configured implements Tool {
if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
|| !CellUtil.matchingRow(lastCell, cell)) {
// row or type changed, write out aggregate KVs.
- if (put != null) context.write(tableOut, put);
- if (del != null) context.write(tableOut, del);
+ if (put != null) {
+ context.write(tableOut, put);
+ }
+ if (del != null) {
+ context.write(tableOut, del);
+ }
if (CellUtil.isDelete(cell)) {
del = new Delete(CellUtil.cloneRow(cell));
} else {
@@ -177,8 +188,12 @@ public class WALPlayer extends Configured implements Tool {
lastCell = cell;
}
// write residual KVs
- if (put != null) context.write(tableOut, put);
- if (del != null) context.write(tableOut, del);
+ if (put != null) {
+ context.write(tableOut, put);
+ }
+ if (del != null) {
+ context.write(tableOut, del);
+ }
}
} catch (InterruptedException e) {
e.printStackTrace();
@@ -186,7 +201,8 @@ public class WALPlayer extends Configured implements Tool {
}
/**
- * @param cell
+ * Filter cell
+ * @param cell cell
* @return Return true if we are to emit this cell.
*/
protected boolean filter(Context context, final Cell cell) {
@@ -197,9 +213,7 @@ public class WALPlayer extends Configured implements Tool {
public void setup(Context context) throws IOException {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
- if (tablesToUse == null && tableMap == null) {
- // Then user wants all tables.
- } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
+ if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
// this can only happen when WALMapper is used directly by a class other than WALPlayer
throw new IOException("No tables or incorrect table mapping specified.");
}
@@ -215,7 +229,9 @@ public class WALPlayer extends Configured implements Tool {
void setupTime(Configuration conf, String option) throws IOException {
String val = conf.get(option);
- if (null == val) return;
+ if (null == val) {
+ return;
+ }
long ms;
try {
// first try to parse in user friendly form
@@ -295,7 +311,8 @@ public class WALPlayer extends Configured implements Tool {
return job;
}
- /*
+ /**
+ * Print usage
* @param errorMsg Error message. Can be null.
*/
private void usage(final String errorMsg) {
@@ -305,7 +322,8 @@ public class WALPlayer extends Configured implements Tool {
System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
System.err.println("Read all WAL entries for <tables>.");
System.err.println("If no tables (\"\") are specific, all tables are imported.");
- System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
+ System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported"+
+ " in that case.)");
System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
System.err.println("<tableMapping> is a command separated list of targettables.");
@@ -318,10 +336,10 @@ public class WALPlayer extends Configured implements Tool {
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
System.err.println(" -D " + JOB_NAME_CONF_KEY
- + "=jobName - use the specified mapreduce job name for the wal player");
+ + "=jobName - use the specified mapreduce job name for the wal player");
System.err.println("For performance also consider the following options:\n"
- + " -Dmapreduce.map.speculative=false\n"
- + " -Dmapreduce.reduce.speculative=false");
+ + " -Dmapreduce.map.speculative=false\n"
+ + " -Dmapreduce.reduce.speculative=false");
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index b4bffb4..89cfd18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -390,6 +391,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
Replication.decorateMasterConfiguration(this.conf);
+ BackupManager.decorateMasterConfiguration(this.conf);
// Hack! Maps DFSClient => Master for logs. HDFS made this
// config param for task trackers, but we can piggyback off of it.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
index 95c3ffe..b6e11ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
@@ -37,7 +37,7 @@ public abstract class RegionServerProcedureManager extends ProcedureManager {
* @param rss Region Server service interface
* @throws KeeperException
*/
- public abstract void initialize(RegionServerServices rss) throws KeeperException;
+ public abstract void initialize(RegionServerServices rss) throws IOException;
/**
* Start accepting procedure requests.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
index 0f4ea64..adb3604 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
-import org.apache.zookeeper.KeeperException;
/**
* Provides the globally barriered procedure framework and environment
@@ -39,7 +38,7 @@ public class RegionServerProcedureManagerHost extends
private static final Log LOG = LogFactory
.getLog(RegionServerProcedureManagerHost.class);
- public void initialize(RegionServerServices rss) throws KeeperException {
+ public void initialize(RegionServerServices rss) throws IOException {
for (RegionServerProcedureManager proc : procedures) {
LOG.debug("Procedure " + proc.getProcedureSignature() + " is initializing");
proc.initialize(rss);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
index 085d642..3865ba9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
@@ -54,7 +54,7 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
* @throws KeeperException if an unexpected zk error occurs
*/
public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
- String procedureClass, String coordName) throws KeeperException {
+ String procedureClass, String coordName) throws IOException {
this.watcher = watcher;
this.procedureType = procedureClass;
this.coordName = coordName;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
index 2e03a60..9b491fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
@@ -68,49 +68,53 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
* @throws KeeperException if we can't reach zookeeper
*/
public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
- throws KeeperException {
- this.zkController = new ZKProcedureUtil(watcher, procType) {
- @Override
- public void nodeCreated(String path) {
- if (!isInProcedurePath(path)) {
- return;
- }
+ throws IOException {
+ try {
+ this.zkController = new ZKProcedureUtil(watcher, procType) {
+ @Override
+ public void nodeCreated(String path) {
+ if (!isInProcedurePath(path)) {
+ return;
+ }
- LOG.info("Received created event:" + path);
- // if it is a simple start/end/abort then we just rewatch the node
- if (isAcquiredNode(path)) {
- waitForNewProcedures();
- return;
- } else if (isAbortNode(path)) {
- watchForAbortedProcedures();
- return;
+ LOG.info("Received created event:" + path);
+ // if it is a simple start/end/abort then we just rewatch the node
+ if (isAcquiredNode(path)) {
+ waitForNewProcedures();
+ return;
+ } else if (isAbortNode(path)) {
+ watchForAbortedProcedures();
+ return;
+ }
+ String parent = ZKUtil.getParent(path);
+ // if its the end barrier, the procedure can be completed
+ if (isReachedNode(parent)) {
+ receivedReachedGlobalBarrier(path);
+ return;
+ } else if (isAbortNode(parent)) {
+ abort(path);
+ return;
+ } else if (isAcquiredNode(parent)) {
+ startNewSubprocedure(path);
+ } else {
+ LOG.debug("Ignoring created notification for node:" + path);
+ }
}
- String parent = ZKUtil.getParent(path);
- // if its the end barrier, the procedure can be completed
- if (isReachedNode(parent)) {
- receivedReachedGlobalBarrier(path);
- return;
- } else if (isAbortNode(parent)) {
- abort(path);
- return;
- } else if (isAcquiredNode(parent)) {
- startNewSubprocedure(path);
- } else {
- LOG.debug("Ignoring created notification for node:" + path);
- }
- }
- @Override
- public void nodeChildrenChanged(String path) {
- if (path.equals(this.acquiredZnode)) {
- LOG.info("Received procedure start children changed event: " + path);
- waitForNewProcedures();
- } else if (path.equals(this.abortZnode)) {
- LOG.info("Received procedure abort children changed event: " + path);
- watchForAbortedProcedures();
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.equals(this.acquiredZnode)) {
+ LOG.info("Received procedure start children changed event: " + path);
+ waitForNewProcedures();
+ } else if (path.equals(this.abortZnode)) {
+ LOG.info("Received procedure abort children changed event: " + path);
+ watchForAbortedProcedures();
+ }
}
- }
- };
+ };
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
}
public ZKProcedureUtil getZkController() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index 1aa959c..bd65cc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -317,7 +317,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
* @throws KeeperException if the zookeeper cannot be reached
*/
@Override
- public void initialize(RegionServerServices rss) throws KeeperException {
+ public void initialize(RegionServerServices rss) throws IOException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4ab2693..0ce8ee4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -828,8 +828,8 @@ public class HRegionServer extends HasThread implements
rspmHost = new RegionServerProcedureManagerHost();
rspmHost.loadProcedures(conf);
rspmHost.initialize(this);
- } catch (KeeperException e) {
- this.abort("Failed to reach zk cluster when creating procedure handler.", e);
+ } catch (IOException e) {
+ this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
}
// register watcher for recovering regions
this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index 537329a..e56dd28 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -390,7 +390,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
* @throws KeeperException if the zookeeper cluster cannot be reached
*/
@Override
- public void initialize(RegionServerServices rss) throws KeeperException {
+ public void initialize(RegionServerServices rss) throws IOException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index f3f869c..31f05c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -96,6 +96,8 @@ import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+
+
/**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
* Only one WAL is ever being written at a time. When a WAL hits a configured maximum size,
@@ -356,7 +358,9 @@ public class FSHLog implements WAL {
public int compare(Path o1, Path o2) {
long t1 = getFileNumFromFileName(o1);
long t2 = getFileNumFromFileName(o2);
- if (t1 == t2) return 0;
+ if (t1 == t2) {
+ return 0;
+ }
return (t1 > t2) ? 1 : -1;
}
};
@@ -399,7 +403,7 @@ public class FSHLog implements WAL {
* @param root path for stored and archived wals
* @param logDir dir where wals are stored
* @param conf configuration to use
- * @throws IOException
+ * @throws IOException exception
*/
public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
throws IOException {
@@ -407,7 +411,7 @@ public class FSHLog implements WAL {
}
/**
- * Create an edit log at the given <code>dir</code> location.
+ * Create an edit log at the given directory location.
*
* You should never have to load an existing log. If there is a log at
* startup, it should have already been processed and deleted by the time the
@@ -422,13 +426,13 @@ public class FSHLog implements WAL {
* be registered before we do anything else; e.g. the
* Constructor {@link #rollWriter()}.
* @param failIfWALExists If true IOException will be thrown if files related to this wal
- * already exist.
+ * already exist.
* @param prefix should always be hostname and port in distributed env and
- * it will be URL encoded before being used.
- * If prefix is null, "wal" will be used
+ * it will be URL encoded before being used.
+ * If prefix is null, "wal" will be used
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
- * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
- * @throws IOException
+ * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
+ * @throws IOException exception
*/
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
final String archiveDir, final Configuration conf,
@@ -590,7 +594,9 @@ public class FSHLog implements WAL {
@VisibleForTesting
OutputStream getOutputStream() {
FSDataOutputStream fsdos = this.hdfs_out;
- if (fsdos == null) return null;
+ if (fsdos == null) {
+ return null;
+ }
return fsdos.getWrappedStream();
}
@@ -625,7 +631,7 @@ public class FSHLog implements WAL {
/**
* Tell listeners about pre log roll.
- * @throws IOException
+ * @throws IOException exception
*/
private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
throws IOException {
@@ -638,7 +644,7 @@ public class FSHLog implements WAL {
/**
* Tell listeners about post log roll.
- * @throws IOException
+ * @throws IOException exception
*/
private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
throws IOException {
@@ -651,8 +657,7 @@ public class FSHLog implements WAL {
/**
* Run a sync after opening to set up the pipeline.
- * @param nextWriter
- * @param startTimeNanos
+ * @param nextWriter next writer
*/
private void preemptiveSync(final ProtobufLogWriter nextWriter) {
long startTimeNanos = System.nanoTime();
@@ -670,7 +675,9 @@ public class FSHLog implements WAL {
rollWriterLock.lock();
try {
// Return if nothing to flush.
- if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
+ if (!force && (this.writer != null && this.numEntries.get() <= 0)) {
+ return null;
+ }
byte [][] regionsToFlush = null;
if (this.closed) {
LOG.debug("WAL closed. Skipping rolling of writer");
@@ -725,7 +732,7 @@ public class FSHLog implements WAL {
/**
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
- * @throws IOException
+ * @throws IOException exception
*/
private void cleanOldLogs() throws IOException {
List<Path> logsToArchive = null;
@@ -735,9 +742,13 @@ public class FSHLog implements WAL {
Path log = e.getKey();
Map<byte[], Long> sequenceNums = e.getValue();
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
- if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
+ if (logsToArchive == null) {
+ logsToArchive = new ArrayList<Path>();
+ }
logsToArchive.add(log);
- if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("WAL file ready for archiving " + log);
+ }
}
}
if (logsToArchive != null) {
@@ -767,7 +778,9 @@ public class FSHLog implements WAL {
if (regions != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
- if (i > 0) sb.append(", ");
+ if (i > 0) {
+ sb.append(", ");
+ }
sb.append(Bytes.toStringBinary(regions[i]));
}
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
@@ -833,7 +846,9 @@ public class FSHLog implements WAL {
}
} catch (FailedSyncBeforeLogCloseException e) {
// If unflushed/unsynced entries on close, it is reason to abort.
- if (isUnflushedEntries()) throw e;
+ if (isUnflushedEntries()) {
+ throw e;
+ }
LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
e.getMessage());
}
@@ -894,7 +909,9 @@ public class FSHLog implements WAL {
try {
blockOnSync(syncFuture);
} catch (IOException ioe) {
- if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stale sync exception", ioe);
+ }
}
}
}
@@ -965,7 +982,15 @@ public class FSHLog implements WAL {
public Path getCurrentFileName() {
return computeFilename(this.filenum.get());
}
-
+
+ /**
+ * To support old API compatibility
+ * @return current file number (timestamp)
+ */
+ public long getFilenum() {
+ return filenum.get();
+ }
+
@Override
public String toString() {
return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java
new file mode 100644
index 0000000..26f261c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public final class LogUtils {
+
+ private LogUtils() {
+ }
+ /**
+ * Disables Zk- and HBase client logging
+ * @param log
+ */
+ public static void disableUselessLoggers(Log log) {
+ // disable zookeeper log to avoid it mess up command output
+ Logger zkLogger = Logger.getLogger("org.apache.zookeeper");
+ zkLogger.setLevel(Level.OFF);
+ // disable hbase zookeeper tool log to avoid it mess up command output
+ Logger hbaseZkLogger = Logger.getLogger("org.apache.hadoop.hbase.zookeeper");
+ hbaseZkLogger.setLevel(Level.OFF);
+ // disable hbase client log to avoid it mess up command output
+ Logger hbaseClientLogger = Logger.getLogger("org.apache.hadoop.hbase.client");
+ hbaseClientLogger.setLevel(Level.OFF);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index 027e7a2..dd4d337 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -209,13 +209,18 @@ public class DefaultWALProvider implements WALProvider {
@VisibleForTesting
public static long extractFileNumFromWAL(final WAL wal) {
final Path walName = ((FSHLog)wal).getCurrentFileName();
+ return extractFileNumFromWAL(walName);
+ }
+
+ @VisibleForTesting
+ public static long extractFileNumFromWAL(final Path walName) {
if (walName == null) {
throw new IllegalArgumentException("The WAL path couldn't be null");
}
final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2:1)]);
}
-
+
/**
* Pattern used to validate a WAL file name
* see {@link #validateWALFilename(String)} for description.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
new file mode 100644
index 0000000..84b7c78
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -0,0 +1,209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
+import org.apache.hadoop.hbase.backup.impl.BackupContext;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * This class is only a base for other integration-level backup tests. Do not add tests here.
+ * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
+ * tests should have their own classes and extend this one
+ */
+public class TestBackupBase {
+
+ private static final Log LOG = LogFactory.getLog(TestBackupBase.class);
+
+ protected static Configuration conf1;
+ protected static Configuration conf2;
+
+ protected static HBaseTestingUtility TEST_UTIL;
+ protected static HBaseTestingUtility TEST_UTIL2;
+ protected static TableName table1;
+ protected static TableName table2;
+ protected static TableName table3;
+ protected static TableName table4;
+
+ protected static TableName table1_restore = TableName.valueOf("table1_restore");
+ protected static TableName table2_restore = TableName.valueOf("table2_restore");
+ protected static TableName table3_restore = TableName.valueOf("table3_restore");
+ protected static TableName table4_restore = TableName.valueOf("table4_restore");
+
+ protected static final int NB_ROWS_IN_BATCH = 100;
+ protected static final byte[] qualName = Bytes.toBytes("q1");
+ protected static final byte[] famName = Bytes.toBytes("f");
+
+ protected static String BACKUP_ROOT_DIR = "/backupUT";
+ protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT";
+
+ protected static final String BACKUP_ZNODE = "/backup/hbase";
+ protected static final String BACKUP_SUCCEED_NODE = "complete";
+ protected static final String BACKUP_FAILED_NODE = "failed";
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().set("hbase.procedure.regionserver.classes",
+ LogRollRegionServerProcedureManager.class.getName());
+ TEST_UTIL.getConfiguration().set("hbase.procedure.master.classes",
+ LogRollMasterProcedureManager.class.getName());
+ TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ TEST_UTIL.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
+
+ conf1 = TEST_UTIL.getConfiguration();
+ conf2 = HBaseConfiguration.create(conf1);
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ TEST_UTIL2 = new HBaseTestingUtility(conf2);
+ TEST_UTIL2.setZkCluster(miniZK);
+ TEST_UTIL.startMiniCluster();
+ TEST_UTIL2.startMiniCluster();
+ conf1 = TEST_UTIL.getConfiguration();
+
+ TEST_UTIL.startMiniMapReduceCluster();
+ BACKUP_ROOT_DIR = TEST_UTIL.getConfiguration().get("fs.defaultFS") + "/backupUT";
+ LOG.info("ROOTDIR " + BACKUP_ROOT_DIR);
+ BACKUP_REMOTE_ROOT_DIR = TEST_UTIL2.getConfiguration().get("fs.defaultFS") + "/backupUT";
+ LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
+
+ createTables();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
+ SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+ TEST_UTIL2.shutdownMiniCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ }
+
+ protected static void loadTable(HTable table) throws Exception {
+
+ Put p; // 100 + 1 row to t1_syncup
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ table.put(p);
+ }
+ }
+
+ protected static void createTables() throws Exception {
+
+ long tid = System.currentTimeMillis();
+ table1 = TableName.valueOf("test-" + tid);
+ HBaseAdmin ha = TEST_UTIL.getHBaseAdmin();
+ HTableDescriptor desc = new HTableDescriptor(table1);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ desc.addFamily(fam);
+ ha.createTable(desc);
+ Connection conn = ConnectionFactory.createConnection(conf1);
+ HTable table = (HTable) conn.getTable(table1);
+ loadTable(table);
+ table.close();
+ table2 = TableName.valueOf("test-" + tid + 1);
+ desc = new HTableDescriptor(table2);
+ desc.addFamily(fam);
+ ha.createTable(desc);
+ table = (HTable) conn.getTable(table2);
+ loadTable(table);
+ table.close();
+ table3 = TableName.valueOf("test-" + tid + 2);
+ table = TEST_UTIL.createTable(table3, famName);
+ table.close();
+ table4 = TableName.valueOf("test-" + tid + 3);
+ table = TEST_UTIL.createTable(table4, famName);
+ table.close();
+ ha.close();
+ conn.close();
+ }
+
+ protected boolean checkSucceeded(String backupId) throws IOException {
+ BackupContext status = getBackupContext(backupId);
+ if (status == null) return false;
+ return status.getState() == BackupState.COMPLETE;
+ }
+
+ protected boolean checkFailed(String backupId) throws IOException {
+ BackupContext status = getBackupContext(backupId);
+ if (status == null) return false;
+ return status.getState() == BackupState.FAILED;
+ }
+
+ private BackupContext getBackupContext(String backupId) throws IOException {
+ Configuration conf = conf1;//BackupClientImpl.getConf();
+ try (Connection connection = ConnectionFactory.createConnection(conf);
+ BackupSystemTable table = new BackupSystemTable(connection)) {
+ BackupContext status = table.readBackupStatus(backupId);
+ return status;
+ }
+ }
+
+ protected BackupClient getBackupClient(){
+ return BackupRestoreFactory.getBackupClient(conf1);
+ }
+
+ protected RestoreClient getRestoreClient()
+ {
+ return BackupRestoreFactory.getRestoreClient(conf1);
+ }
+
+ /**
+ * Helper method
+ */
+ protected List<TableName> toList(String... args){
+ List<TableName> ret = new ArrayList<>();
+ for(int i=0; i < args.length; i++){
+ ret.add(TableName.valueOf(args[i]));
+ }
+ return ret;
+ }
+}