You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/20 17:41:48 UTC
[15/28] hbase git commit: HBASE-14123 HBase Backup/Restore Phase 2
(Vladimir Rodionov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
new file mode 100644
index 0000000..e90d5c1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple MR input format for HFiles.
+ * This code was borrowed from Apache Crunch project.
+ * Updated to the recent version of HBase.
+ */
+public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
+
+ /**
+ * File filter that removes all "hidden" files. This might be something worth removing from
+ * a more general purpose utility; it accounts for the presence of metadata files created
+ * in the way we're doing exports.
+ */
+ static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Record reader for HFiles.
+ */
+ private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
+
+ private Reader in;
+ protected Configuration conf;
+ private HFileScanner scanner;
+
+ /**
+ * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
+ */
+ private Cell value = null;
+ private long count;
+ private boolean seeked = false;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit) split;
+ conf = context.getConfiguration();
+ Path path = fileSplit.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ LOG.info("Initialize HFileRecordReader for {}", path);
+ this.in = HFile.createReader(fs, path, conf);
+
+ // The file info must be loaded before the scanner can be used.
+ // This seems like a bug in HBase, but it's easily worked around.
+ this.in.loadFileInfo();
+ this.scanner = in.getScanner(false, false);
+
+ }
+
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ boolean hasNext;
+ if (!seeked) {
+ LOG.info("Seeking to start");
+ hasNext = scanner.seekTo();
+ seeked = true;
+ } else {
+ hasNext = scanner.next();
+ }
+ if (!hasNext) {
+ return false;
+ }
+ value = scanner.getCell();
+ count++;
+ return true;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public Cell getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
+ // the start row, but better than nothing anyway.
+ return 1.0f * count / in.getEntries();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ }
+ }
+
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+
+ // Explode out directories that match the original FileInputFormat filters
+ // since HFiles are written to directories where the
+ // directory name is the column name
+ for (FileStatus status : super.listStatus(job)) {
+ if (status.isDirectory()) {
+ FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
+ for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
+ result.add(match);
+ }
+ } else {
+ result.add(status);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new HFileRecordReader();
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ // This file isn't splittable.
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 718e88b..f59e24c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -100,7 +100,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* Tool to load the output of HFileOutputFormat into an existing table.
*/
@@ -116,7 +115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
= "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
public final static String CREATE_TABLE_CONF_KEY = "create.table";
- public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
+ public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
public final static String ALWAYS_COPY_FILES = "always.copy.files";
// We use a '.' prefix which is ignored when walking directory trees
@@ -168,7 +167,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
+ CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
+ " Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
- + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
+ + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
+ "\n");
}
@@ -530,7 +529,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
boolean success = false;
try {
LOG.debug("Going to connect to server " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
+ + Bytes.toStringBinary(getRow()) + " with hfile group " +
+ LoadIncrementalHFiles.this.toString( famPaths));
byte[] regionName = getLocation().getRegionInfo().getRegionName();
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
@@ -1047,6 +1047,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
}
+ private final String toString(List<Pair<byte[], String>> list) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("[");
+ if(list != null){
+ for(Pair<byte[], String> pair: list) {
+ sb.append("{");
+ sb.append(Bytes.toStringBinary(pair.getFirst()));
+ sb.append(",");
+ sb.append(pair.getSecond());
+ sb.append("}");
+ }
+ }
+ sb.append("]");
+ return sb.toString();
+ }
private boolean isSecureBulkLoadEndpointAvailable() {
String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
@@ -1245,7 +1260,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
- boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
+ boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
if (dirPath != null) {
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 8514ace..0ca78b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -27,22 +27,26 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
/**
* Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
@@ -169,7 +173,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
temp = reader.next(currentEntry);
i++;
} catch (EOFException x) {
- LOG.info("Corrupted entry detected. Ignoring the rest of the file."
+ LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
+ " (This is normal when a RegionServer crashed.)");
return false;
}
@@ -231,29 +235,37 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
- Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
-
+ Path[] inputPaths = getInputPaths(conf);
long startTime = conf.getLong(startKey, Long.MIN_VALUE);
long endTime = conf.getLong(endKey, Long.MAX_VALUE);
- FileSystem fs = inputDir.getFileSystem(conf);
- List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
-
- List<InputSplit> splits = new ArrayList<>(files.size());
- for (FileStatus file : files) {
+ List<FileStatus> allFiles = new ArrayList<FileStatus>();
+ for(Path inputPath: inputPaths){
+ FileSystem fs = inputPath.getFileSystem(conf);
+ List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
+ allFiles.addAll(files);
+ }
+ List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
+ for (FileStatus file : allFiles) {
splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
}
return splits;
}
+ private Path[] getInputPaths(Configuration conf) {
+ String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir");
+ return StringUtils.stringToPath(inpDirs.split(","));
+ }
+
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
throws IOException {
List<FileStatus> result = new ArrayList<>();
LOG.debug("Scanning " + dir.toString() + " for WAL files");
- FileStatus[] files = fs.listStatus(dir);
- if (files == null) return Collections.emptyList();
- for (FileStatus file : files) {
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
+ if (!iter.hasNext()) return Collections.emptyList();
+ while (iter.hasNext()) {
+ LocatedFileStatus file = iter.next();
if (file.isDirectory()) {
// recurse into sub directories
result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
@@ -264,7 +276,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
try {
long fileStartTime = Long.parseLong(name.substring(idx+1));
if (fileStartTime <= endTime) {
- LOG.info("Found: " + name);
+ LOG.info("Found: " + file);
result.add(file);
}
} catch (NumberFormatException x) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index cca2041..d16dcf5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -70,9 +70,9 @@ import org.apache.hadoop.util.ToolRunner;
public class WALPlayer extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(WALPlayer.class);
final static String NAME = "WALPlayer";
- final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
- final static String TABLES_KEY = "wal.input.tables";
- final static String TABLE_MAP_KEY = "wal.input.tablesmap";
+ public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
+ public final static String TABLES_KEY = "wal.input.tables";
+ public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
// This relies on Hadoop Configuration to handle warning about deprecated configs and
// to set the correct non-deprecated configs when an old one shows up.
@@ -84,6 +84,9 @@ public class WALPlayer extends Configured implements Tool {
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+ public WALPlayer(){
+ }
+
protected WALPlayer(final Configuration c) {
super(c);
}
@@ -93,7 +96,7 @@ public class WALPlayer extends Configured implements Tool {
* This one can be used together with {@link KeyValueSortReducer}
*/
static class WALKeyValueMapper
- extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
+ extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
private byte[] table;
@Override
@@ -105,7 +108,9 @@ public class WALPlayer extends Configured implements Tool {
if (Bytes.equals(table, key.getTablename().getName())) {
for (Cell cell : value.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- if (WALEdit.isMetaEditFamily(kv)) continue;
+ if (WALEdit.isMetaEditFamily(kv)) {
+ continue;
+ }
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv);
}
}
@@ -148,7 +153,9 @@ public class WALPlayer extends Configured implements Tool {
Cell lastCell = null;
for (Cell cell : value.getCells()) {
// filtering WAL meta entries
- if (WALEdit.isMetaEditFamily(cell)) continue;
+ if (WALEdit.isMetaEditFamily(cell)) {
+ continue;
+ }
// Allow a subclass filter out this cell.
if (filter(context, cell)) {
@@ -159,8 +166,12 @@ public class WALPlayer extends Configured implements Tool {
if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
|| !CellUtil.matchingRow(lastCell, cell)) {
// row or type changed, write out aggregate KVs.
- if (put != null) context.write(tableOut, put);
- if (del != null) context.write(tableOut, del);
+ if (put != null) {
+ context.write(tableOut, put);
+ }
+ if (del != null) {
+ context.write(tableOut, del);
+ }
if (CellUtil.isDelete(cell)) {
del = new Delete(CellUtil.cloneRow(cell));
} else {
@@ -176,31 +187,41 @@ public class WALPlayer extends Configured implements Tool {
lastCell = cell;
}
// write residual KVs
- if (put != null) context.write(tableOut, put);
- if (del != null) context.write(tableOut, del);
+ if (put != null) {
+ context.write(tableOut, put);
+ }
+ if (del != null) {
+ context.write(tableOut, del);
+ }
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- /**
- * @param cell
- * @return Return true if we are to emit this cell.
- */
protected boolean filter(Context context, final Cell cell) {
return true;
}
@Override
+ protected void
+ cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
+ throws IOException, InterruptedException {
+ super.cleanup(context);
+ }
+
+ @Override
public void setup(Context context) throws IOException {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
- if (tablesToUse == null && tableMap == null) {
+ if (tableMap == null) {
+ tableMap = tablesToUse;
+ }
+ if (tablesToUse == null) {
// Then user wants all tables.
- } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
+ } else if (tablesToUse.length != tableMap.length) {
// this can only happen when WALMapper is used directly by a class other than WALPlayer
- throw new IOException("No tables or incorrect table mapping specified.");
+ throw new IOException("Incorrect table mapping specified .");
}
int i = 0;
if (tablesToUse != null) {
@@ -214,7 +235,9 @@ public class WALPlayer extends Configured implements Tool {
void setupTime(Configuration conf, String option) throws IOException {
String val = conf.get(option);
- if (null == val) return;
+ if (null == val) {
+ return;
+ }
long ms;
try {
// first try to parse in user friendly form
@@ -243,7 +266,7 @@ public class WALPlayer extends Configured implements Tool {
Configuration conf = getConf();
setupTime(conf, WALInputFormat.START_TIME_KEY);
setupTime(conf, WALInputFormat.END_TIME_KEY);
- Path inputDir = new Path(args[0]);
+ String inputDirs = args[0];
String[] tables = args[1].split(",");
String[] tableMap;
if (args.length > 2) {
@@ -257,13 +280,18 @@ public class WALPlayer extends Configured implements Tool {
}
conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap);
- Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir));
+ Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
job.setJarByClass(WALPlayer.class);
- FileInputFormat.setInputPaths(job, inputDir);
+
+ FileInputFormat.addInputPaths(job, inputDirs);
+
job.setInputFormatClass(WALInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
+ LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+
// the bulk HFile case
if (tables.length != 1) {
throw new IOException("Exactly one table must be specified for the bulk export option");
@@ -299,7 +327,9 @@ public class WALPlayer extends Configured implements Tool {
return job;
}
- /*
+
+ /**
+ * Print usage
* @param errorMsg Error message. Can be null.
*/
private void usage(final String errorMsg) {
@@ -309,7 +339,8 @@ public class WALPlayer extends Configured implements Tool {
System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
System.err.println("Read all WAL entries for <tables>.");
System.err.println("If no tables (\"\") are specific, all tables are imported.");
- System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
+ System.err.println("(Careful, even hbase:meta entries will be imported"+
+ " in that case.)");
System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
System.err.println("<tableMapping> is a command separated list of targettables.");
@@ -322,10 +353,10 @@ public class WALPlayer extends Configured implements Tool {
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
System.err.println(" -D " + JOB_NAME_CONF_KEY
- + "=jobName - use the specified mapreduce job name for the wal player");
+ + "=jobName - use the specified mapreduce job name for the wal player");
System.err.println("For performance also consider the following options:\n"
- + " -Dmapreduce.map.speculative=false\n"
- + " -Dmapreduce.reduce.speculative=false");
+ + " -Dmapreduce.map.speculative=false\n"
+ + " -Dmapreduce.reduce.speculative=false");
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 177ee32..f86f800 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -74,10 +74,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
@@ -88,9 +86,129 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockH
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.*;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse.Capability;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -233,6 +351,7 @@ public class MasterRpcServices extends RSRpcServices
/**
* @return list of blocking services and their security info classes that this server supports
*/
+ @Override
protected List<BlockingServiceAndInterface> getServices() {
List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
bssi.add(new BlockingServiceAndInterface(
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index 54b68d3..55d58e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -62,7 +62,7 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
-import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
@@ -1114,7 +1114,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
- ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+ ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java
new file mode 100644
index 0000000..b656894
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
+ */
+@InterfaceAudience.Private
+public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs {
+ private static final Log LOG = LogFactory.getLog(ZKProcedureCoordinator.class);
+ private ZKProcedureUtil zkProc = null;
+ protected ProcedureCoordinator coordinator = null; // if started this should be non-null
+
+ ZooKeeperWatcher watcher;
+ String procedureType;
+ String coordName;
+
+ /**
+ * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
+ * @param procedureClass procedure type name is a category for when there are multiple kinds of
+ * procedures.-- this becomes a znode so be aware of the naming restrictions
+ * @param coordName name of the node running the coordinator
+ * @throws KeeperException if an unexpected zk error occurs
+ */
+ public ZKProcedureCoordinator(ZooKeeperWatcher watcher,
+ String procedureClass, String coordName) {
+ this.watcher = watcher;
+ this.procedureType = procedureClass;
+ this.coordName = coordName;
+ }
+
+ /**
+ * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes
+ * appear, first acquire to relevant listener or sets watch waiting for notification of
+ * the acquire node
+ *
+ * @param proc the Procedure
+ * @param info data to be stored in the acquire node
+ * @param nodeNames children of the acquire phase
+ * @throws IOException if any failure occurs.
+ */
+ @Override
+ final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
+ throws IOException, IllegalArgumentException {
+ String procName = proc.getName();
+ // start watching for the abort node
+ String abortNode = zkProc.getAbortZNode(procName);
+ try {
+ // check to see if the abort node already exists
+ if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
+ abort(abortNode);
+ }
+ // If we get an abort node watch triggered here, we'll go complete creating the acquired
+ // znode but then handle the acquire znode and bail out
+ } catch (KeeperException e) {
+ String msg = "Failed while watching abort node:" + abortNode;
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ // create the acquire barrier
+ String acquire = zkProc.getAcquiredBarrierNode(procName);
+ LOG.debug("Creating acquire znode:" + acquire);
+ try {
+ // notify all the procedure listeners to look for the acquire node
+ byte[] data = ProtobufUtil.prependPBMagic(info);
+ ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
+ // loop through all the children of the acquire phase and watch for them
+ for (String node : nodeNames) {
+ String znode = ZKUtil.joinZNode(acquire, node);
+ LOG.debug("Watching for acquire node:" + znode);
+ if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
+ coordinator.memberAcquiredBarrier(procName, node);
+ }
+ }
+ } catch (KeeperException e) {
+ String msg = "Failed while creating acquire node:" + acquire;
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+
+ @Override
+ public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
+ String procName = proc.getName();
+ String reachedNode = zkProc.getReachedBarrierNode(procName);
+ LOG.debug("Creating reached barrier zk node:" + reachedNode);
+ try {
+ // create the reached znode and watch for the reached znodes
+ ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
+ // loop through all the children of the acquire phase and watch for them
+ for (String node : nodeNames) {
+ String znode = ZKUtil.joinZNode(reachedNode, node);
+ if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
+ byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
+ // ProtobufUtil.isPBMagicPrefix will check null
+ if (dataFromMember != null && dataFromMember.length > 0) {
+ if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
+ String msg =
+ "Failed to get data from finished node or data is illegally formatted: " + znode;
+ LOG.error(msg);
+ throw new IOException(msg);
+ } else {
+ dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
+ dataFromMember.length);
+ coordinator.memberFinishedBarrier(procName, node, dataFromMember);
+ }
+ } else {
+ coordinator.memberFinishedBarrier(procName, node, dataFromMember);
+ }
+ }
+ }
+ } catch (KeeperException e) {
+ String msg = "Failed while creating reached node:" + reachedNode;
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ } catch (InterruptedException e) {
+ String msg = "Interrupted while creating reached node:" + reachedNode;
+ LOG.error(msg, e);
+ throw new InterruptedIOException(msg);
+ }
+ }
+
+
+ /**
+ * Delete znodes that are no longer in use.
+ */
+ @Override
+ final public void resetMembers(Procedure proc) throws IOException {
+ String procName = proc.getName();
+ boolean stillGettingNotifications = false;
+ do {
+ try {
+ LOG.debug("Attempting to clean out zk node for op:" + procName);
+ zkProc.clearZNodes(procName);
+ stillGettingNotifications = false;
+ } catch (KeeperException.NotEmptyException e) {
+ // recursive delete isn't transactional (yet) so we need to deal with cases where we get
+ // children trickling in
+ stillGettingNotifications = true;
+ } catch (KeeperException e) {
+ String msg = "Failed to complete reset procedure " + procName;
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ } while (stillGettingNotifications);
+ }
+
+ /**
+ * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
+ * @return true if succeed, false if encountered initialization errors.
+ */
+ @Override
+ final public boolean start(final ProcedureCoordinator coordinator) {
+ if (this.coordinator != null) {
+ throw new IllegalStateException(
+ "ZKProcedureCoordinator already started and already has listener installed");
+ }
+ this.coordinator = coordinator;
+
+ try {
+ this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
+ @Override
+ public void nodeCreated(String path) {
+ if (!isInProcedurePath(path)) return;
+ LOG.debug("Node created: " + path);
+ logZKTree(this.baseZNode);
+ if (isAcquiredPathNode(path)) {
+ // node wasn't present when we created the watch so zk event triggers acquire
+ coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
+ ZKUtil.getNodeName(path));
+ } else if (isReachedPathNode(path)) {
+ // node was absent when we created the watch so zk event triggers the finished barrier.
+
+ // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
+ String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
+ String member = ZKUtil.getNodeName(path);
+ // get the data from the procedure member
+ try {
+ byte[] dataFromMember = ZKUtil.getData(watcher, path);
+ // ProtobufUtil.isPBMagicPrefix will check null
+ if (dataFromMember != null && dataFromMember.length > 0) {
+ if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
+ ForeignException ee = new ForeignException(coordName,
+ "Failed to get data from finished node or data is illegally formatted:"
+ + path);
+ coordinator.abortProcedure(procName, ee);
+ } else {
+ dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
+ dataFromMember.length);
+ LOG.debug("Finished data from procedure '" + procName
+ + "' member '" + member + "': " + new String(dataFromMember));
+ coordinator.memberFinishedBarrier(procName, member, dataFromMember);
+ }
+ } else {
+ coordinator.memberFinishedBarrier(procName, member, dataFromMember);
+ }
+ } catch (KeeperException e) {
+ ForeignException ee = new ForeignException(coordName, e);
+ coordinator.abortProcedure(procName, ee);
+ } catch (InterruptedException e) {
+ ForeignException ee = new ForeignException(coordName, e);
+ coordinator.abortProcedure(procName, ee);
+ }
+ } else if (isAbortPathNode(path)) {
+ abort(path);
+ } else {
+ LOG.debug("Ignoring created notification for node:" + path);
+ }
+ }
+ };
+ zkProc.clearChildZNodes();
+ } catch (KeeperException e) {
+ LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
+ return false;
+ }
+
+ LOG.debug("Starting the controller for procedure member:" + coordName);
+ return true;
+ }
+
+ /**
+ * This is the abort message being sent by the coordinator to member
+ *
+ * TODO this code isn't actually used but can be used to issue a cancellation from the
+ * coordinator.
+ */
+ @Override
+ final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
+ String procName = proc.getName();
+ LOG.debug("Aborting procedure '" + procName + "' in zk");
+ String procAbortNode = zkProc.getAbortZNode(procName);
+ try {
+ LOG.debug("Creating abort znode:" + procAbortNode);
+ String source = (ee.getSource() == null) ? coordName : ee.getSource();
+ byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
+ // first create the znode for the procedure
+ ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
+ LOG.debug("Finished creating abort node:" + procAbortNode);
+ } catch (KeeperException e) {
+ // possible that we get this error for the procedure if we already reset the zk state, but in
+ // that case we should still get an error for that procedure anyways
+ zkProc.logZKTree(zkProc.baseZNode);
+ coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
+ + " to abort procedure '" + procName + "'", new IOException(e));
+ }
+ }
+
+ /**
+ * Receive a notification and propagate it to the local coordinator
+ * @param abortNode full znode path to the failed procedure information
+ */
+ protected void abort(String abortNode) {
+ String procName = ZKUtil.getNodeName(abortNode);
+ ForeignException ee = null;
+ try {
+ byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
+ if (data == null || data.length == 0) {
+ // ignore
+ return;
+ } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
+ LOG.warn("Got an error notification for op:" + abortNode
+ + " but we can't read the information. Killing the procedure.");
+ // we got a remote exception, but we can't describe it
+ ee = new ForeignException(coordName,
+ "Data in abort node is illegally formatted. ignoring content.");
+ } else {
+
+ data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ ee = ForeignException.deserialize(data);
+ }
+ } catch (IOException e) {
+ LOG.warn("Got an error notification for op:" + abortNode
+ + " but we can't read the information. Killing the procedure.");
+ // we got a remote exception, but we can't describe it
+ ee = new ForeignException(coordName, e);
+ } catch (KeeperException e) {
+ coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
+ + zkProc.getAbortZnode(), new IOException(e));
+ } catch (InterruptedException e) {
+ coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
+ + zkProc.getAbortZnode(), new IOException(e));
+ Thread.currentThread().interrupt();
+ }
+ coordinator.abortProcedure(procName, ee);
+ }
+
+ @Override
+ final public void close() throws IOException {
+ zkProc.close();
+ }
+
+ /**
+ * Used in testing
+ */
+ final ZKProcedureUtil getZkProcedureUtil() {
+ return zkProc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
deleted file mode 100644
index 4632d23..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.procedure;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.errorhandling.ForeignException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
- */
-@InterfaceAudience.Private
-public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
- private static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
- private ZKProcedureUtil zkProc = null;
- protected ProcedureCoordinator coordinator = null; // if started this should be non-null
-
- ZooKeeperWatcher watcher;
- String procedureType;
- String coordName;
-
- /**
- * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
- * @param procedureClass procedure type name is a category for when there are multiple kinds of
- * procedures.-- this becomes a znode so be aware of the naming restrictions
- * @param coordName name of the node running the coordinator
- * @throws KeeperException if an unexpected zk error occurs
- */
- public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
- String procedureClass, String coordName) throws KeeperException {
- this.watcher = watcher;
- this.procedureType = procedureClass;
- this.coordName = coordName;
- }
-
- /**
- * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes
- * appear, first acquire to relevant listener or sets watch waiting for notification of
- * the acquire node
- *
- * @param proc the Procedure
- * @param info data to be stored in the acquire node
- * @param nodeNames children of the acquire phase
- * @throws IOException if any failure occurs.
- */
- @Override
- final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
- throws IOException, IllegalArgumentException {
- String procName = proc.getName();
- // start watching for the abort node
- String abortNode = zkProc.getAbortZNode(procName);
- try {
- // check to see if the abort node already exists
- if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
- abort(abortNode);
- }
- // If we get an abort node watch triggered here, we'll go complete creating the acquired
- // znode but then handle the acquire znode and bail out
- } catch (KeeperException e) {
- String msg = "Failed while watching abort node:" + abortNode;
- LOG.error(msg, e);
- throw new IOException(msg, e);
- }
-
- // create the acquire barrier
- String acquire = zkProc.getAcquiredBarrierNode(procName);
- LOG.debug("Creating acquire znode:" + acquire);
- try {
- // notify all the procedure listeners to look for the acquire node
- byte[] data = ProtobufUtil.prependPBMagic(info);
- ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
- // loop through all the children of the acquire phase and watch for them
- for (String node : nodeNames) {
- String znode = ZKUtil.joinZNode(acquire, node);
- LOG.debug("Watching for acquire node:" + znode);
- if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
- coordinator.memberAcquiredBarrier(procName, node);
- }
- }
- } catch (KeeperException e) {
- String msg = "Failed while creating acquire node:" + acquire;
- LOG.error(msg, e);
- throw new IOException(msg, e);
- }
- }
-
- @Override
- public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
- String procName = proc.getName();
- String reachedNode = zkProc.getReachedBarrierNode(procName);
- LOG.debug("Creating reached barrier zk node:" + reachedNode);
- try {
- // create the reached znode and watch for the reached znodes
- ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
- // loop through all the children of the acquire phase and watch for them
- for (String node : nodeNames) {
- String znode = ZKUtil.joinZNode(reachedNode, node);
- if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
- byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
- // ProtobufUtil.isPBMagicPrefix will check null
- if (dataFromMember != null && dataFromMember.length > 0) {
- if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
- String msg =
- "Failed to get data from finished node or data is illegally formatted: " + znode;
- LOG.error(msg);
- throw new IOException(msg);
- } else {
- dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
- dataFromMember.length);
- coordinator.memberFinishedBarrier(procName, node, dataFromMember);
- }
- } else {
- coordinator.memberFinishedBarrier(procName, node, dataFromMember);
- }
- }
- }
- } catch (KeeperException e) {
- String msg = "Failed while creating reached node:" + reachedNode;
- LOG.error(msg, e);
- throw new IOException(msg, e);
- } catch (InterruptedException e) {
- String msg = "Interrupted while creating reached node:" + reachedNode;
- LOG.error(msg, e);
- throw new InterruptedIOException(msg);
- }
- }
-
-
- /**
- * Delete znodes that are no longer in use.
- */
- @Override
- final public void resetMembers(Procedure proc) throws IOException {
- String procName = proc.getName();
- boolean stillGettingNotifications = false;
- do {
- try {
- LOG.debug("Attempting to clean out zk node for op:" + procName);
- zkProc.clearZNodes(procName);
- stillGettingNotifications = false;
- } catch (KeeperException.NotEmptyException e) {
- // recursive delete isn't transactional (yet) so we need to deal with cases where we get
- // children trickling in
- stillGettingNotifications = true;
- } catch (KeeperException e) {
- String msg = "Failed to complete reset procedure " + procName;
- LOG.error(msg, e);
- throw new IOException(msg, e);
- }
- } while (stillGettingNotifications);
- }
-
- /**
- * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
- * @return true if succeed, false if encountered initialization errors.
- */
- final public boolean start(final ProcedureCoordinator coordinator) {
- if (this.coordinator != null) {
- throw new IllegalStateException(
- "ZKProcedureCoordinator already started and already has listener installed");
- }
- this.coordinator = coordinator;
-
- try {
- this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
- @Override
- public void nodeCreated(String path) {
- if (!isInProcedurePath(path)) return;
- LOG.debug("Node created: " + path);
- logZKTree(this.baseZNode);
- if (isAcquiredPathNode(path)) {
- // node wasn't present when we created the watch so zk event triggers acquire
- coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
- ZKUtil.getNodeName(path));
- } else if (isReachedPathNode(path)) {
- // node was absent when we created the watch so zk event triggers the finished barrier.
-
- // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
- String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
- String member = ZKUtil.getNodeName(path);
- // get the data from the procedure member
- try {
- byte[] dataFromMember = ZKUtil.getData(watcher, path);
- // ProtobufUtil.isPBMagicPrefix will check null
- if (dataFromMember != null && dataFromMember.length > 0) {
- if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
- ForeignException ee = new ForeignException(coordName,
- "Failed to get data from finished node or data is illegally formatted:"
- + path);
- coordinator.abortProcedure(procName, ee);
- } else {
- dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
- dataFromMember.length);
- LOG.debug("Finished data from procedure '" + procName
- + "' member '" + member + "': " + new String(dataFromMember));
- coordinator.memberFinishedBarrier(procName, member, dataFromMember);
- }
- } else {
- coordinator.memberFinishedBarrier(procName, member, dataFromMember);
- }
- } catch (KeeperException e) {
- ForeignException ee = new ForeignException(coordName, e);
- coordinator.abortProcedure(procName, ee);
- } catch (InterruptedException e) {
- ForeignException ee = new ForeignException(coordName, e);
- coordinator.abortProcedure(procName, ee);
- }
- } else if (isAbortPathNode(path)) {
- abort(path);
- } else {
- LOG.debug("Ignoring created notification for node:" + path);
- }
- }
- };
- zkProc.clearChildZNodes();
- } catch (KeeperException e) {
- LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
- return false;
- }
-
- LOG.debug("Starting the controller for procedure member:" + coordName);
- return true;
- }
-
- /**
- * This is the abort message being sent by the coordinator to member
- *
- * TODO this code isn't actually used but can be used to issue a cancellation from the
- * coordinator.
- */
- @Override
- final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
- String procName = proc.getName();
- LOG.debug("Aborting procedure '" + procName + "' in zk");
- String procAbortNode = zkProc.getAbortZNode(procName);
- try {
- LOG.debug("Creating abort znode:" + procAbortNode);
- String source = (ee.getSource() == null) ? coordName : ee.getSource();
- byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
- // first create the znode for the procedure
- ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
- LOG.debug("Finished creating abort node:" + procAbortNode);
- } catch (KeeperException e) {
- // possible that we get this error for the procedure if we already reset the zk state, but in
- // that case we should still get an error for that procedure anyways
- zkProc.logZKTree(zkProc.baseZNode);
- coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
- + " to abort procedure '" + procName + "'", new IOException(e));
- }
- }
-
- /**
- * Receive a notification and propagate it to the local coordinator
- * @param abortNode full znode path to the failed procedure information
- */
- protected void abort(String abortNode) {
- String procName = ZKUtil.getNodeName(abortNode);
- ForeignException ee = null;
- try {
- byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
- if (data == null || data.length == 0) {
- // ignore
- return;
- } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
- LOG.warn("Got an error notification for op:" + abortNode
- + " but we can't read the information. Killing the procedure.");
- // we got a remote exception, but we can't describe it
- ee = new ForeignException(coordName,
- "Data in abort node is illegally formatted. ignoring content.");
- } else {
-
- data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
- ee = ForeignException.deserialize(data);
- }
- } catch (IOException e) {
- LOG.warn("Got an error notification for op:" + abortNode
- + " but we can't read the information. Killing the procedure.");
- // we got a remote exception, but we can't describe it
- ee = new ForeignException(coordName, e);
- } catch (KeeperException e) {
- coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
- + zkProc.getAbortZnode(), new IOException(e));
- } catch (InterruptedException e) {
- coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
- + zkProc.getAbortZnode(), new IOException(e));
- Thread.currentThread().interrupt();
- }
- coordinator.abortProcedure(procName, ee);
- }
-
- @Override
- final public void close() throws IOException {
- zkProc.close();
- }
-
- /**
- * Used in testing
- */
- final ZKProcedureUtil getZkProcedureUtil() {
- return zkProc;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index 7b624a5..3092114 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
-import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -98,7 +98,7 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
// setup the procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
- ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+ ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
master.getZooKeeper(), getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index be4cca0..b3b5113 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -199,13 +199,13 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -385,7 +385,7 @@ public class HRegionServer extends HasThread implements
// WAL roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
- final LogRoller walRoller;
+ protected final LogRoller walRoller;
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
@@ -535,7 +535,6 @@ public class HRegionServer extends HasThread implements
// Disable usage of meta replicas in the regionserver
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
-
// Config'ed params
this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -640,7 +639,7 @@ public class HRegionServer extends HasThread implements
int cleanerInterval =
conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
this.compactedFileDischarger =
- new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this);
+ new CompactedHFilesDischarger(cleanerInterval, this, this);
choreService.scheduleChore(compactedFileDischarger);
}
@@ -859,7 +858,7 @@ public class HRegionServer extends HasThread implements
rspmHost.loadProcedures(conf);
rspmHost.initialize(this);
} catch (KeeperException e) {
- this.abort("Failed to reach zk cluster when creating procedure handler.", e);
+ this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
}
// register watcher for recovering regions
this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
@@ -1948,6 +1947,10 @@ public class HRegionServer extends HasThread implements
return wal;
}
+ public LogRoller getWalRoller() {
+ return walRoller;
+ }
+
@Override
public Connection getConnection() {
return getClusterConnection();
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 9dd85d8..bf14933 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -368,6 +368,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return false;
}
+ public static boolean isArchivedLogFile(Path p) {
+ String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
+ return p.toString().contains(oldLog);
+ }
+
/**
* Get prefix of the log from its name, assuming WAL name in format of
* log_prefix.filenumber.log_suffix
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index a6c7f68..8a4ed72 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -49,7 +49,6 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
@@ -117,6 +116,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
@@ -137,6 +137,8 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
/**
* Facility for testing HBase. Replacement for
* old HBaseTestCase and HBaseClusterTestCase functionality.
@@ -2171,6 +2173,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
}
+ public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
+ throws IOException {
+ Random r = new Random();
+ byte[] row = new byte[rowSize];
+ for (int i = 0; i < totalRows; i++) {
+ r.nextBytes(row);
+ Put put = new Put(row);
+ put.addColumn(f, new byte[]{0}, new byte[]{0});
+ t.put(put);
+ }
+ }
+
public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
int replicaId)
throws IOException {
@@ -3337,6 +3351,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
/**
+ * Waith until all system table's regions get assigned
+ * @throws IOException
+ */
+ public void waitUntilAllSystemRegionsAssigned() throws IOException {
+ waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
+ waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME);
+ }
+
+ /**
* Wait until all regions for a table in hbase:meta have a non-empty
* info:server, or until timeout. This means all regions have been deployed,
* master has been informed and updated hbase:meta with the regions deployed
@@ -3801,12 +3824,24 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/
public static int createPreSplitLoadTestTable(Configuration conf,
HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException {
+
+ return createPreSplitLoadTestTable(conf, desc, hcds,
+ new RegionSplitter.HexStringSplit(), numRegionsPerServer);
+ }
+
+ /**
+ * Creates a pre-split table for load testing. If the table already exists,
+ * logs a warning and continues.
+ * @return the number of regions the table was split into
+ */
+ public static int createPreSplitLoadTestTable(Configuration conf,
+ HTableDescriptor desc, HColumnDescriptor[] hcds,
+ SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
for (HColumnDescriptor hcd : hcds) {
if (!desc.hasFamily(hcd.getName())) {
desc.addFamily(hcd);
}
}
-
int totalNumberOfRegions = 0;
Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
Admin admin = unmanagedConnection.getAdmin();
@@ -3825,7 +3860,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
"pre-splitting table into " + totalNumberOfRegions + " regions " +
"(regions per server: " + numRegionsPerServer + ")");
- byte[][] splits = new RegionSplitter.HexStringSplit().split(
+ byte[][] splits = splitter.split(
totalNumberOfRegions);
admin.createTable(desc, splits);