You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 21:58:32 UTC
svn commit: r1445918 [9/29] - in /hbase/branches/hbase-7290: ./ bin/ conf/
dev-support/ hbase-client/ hbase-common/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/
hbase-common/src/mai...
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java Wed Feb 13 20:58:23 2013
@@ -19,14 +19,11 @@
package org.apache.hadoop.hbase.ipc;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@InterfaceAudience.Private
public class MetricsHBaseServer {
- private static Log LOG = LogFactory.getLog(MetricsHBaseServer.class);
private MetricsHBaseServerSource source;
public MetricsHBaseServer(String serverName, MetricsHBaseServerWrapper wrapper) {
@@ -69,4 +66,4 @@ public class MetricsHBaseServer {
public MetricsHBaseServerSource getMetricsSource() {
return source;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java Wed Feb 13 20:58:23 2013
@@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User;
import java.net.InetAddress;
@@ -90,7 +91,7 @@ public class RequestContext {
*/
public static void set(User user,
InetAddress remoteAddress,
- Class<? extends VersionedProtocol> protocol) {
+ Class<? extends IpcProtocol> protocol) {
RequestContext ctx = instance.get();
ctx.user = user;
ctx.remoteAddress = remoteAddress;
@@ -111,12 +112,12 @@ public class RequestContext {
private User user;
private InetAddress remoteAddress;
- private Class<? extends VersionedProtocol> protocol;
+ private Class<? extends IpcProtocol> protocol;
// indicates we're within a RPC request invocation
private boolean inRequest;
private RequestContext(User user, InetAddress remoteAddr,
- Class<? extends VersionedProtocol> protocol) {
+ Class<? extends IpcProtocol> protocol) {
this.user = user;
this.remoteAddress = remoteAddr;
this.protocol = protocol;
@@ -130,11 +131,11 @@ public class RequestContext {
return remoteAddress;
}
- public Class<? extends VersionedProtocol> getProtocol() {
+ public Class<? extends IpcProtocol> getProtocol() {
return protocol;
}
public boolean isInRequest() {
return inRequest;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Wed Feb 13 20:58:23 2013
@@ -23,16 +23,18 @@ import com.google.common.base.Function;
import com.google.protobuf.Message;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import java.io.IOException;
import java.net.InetSocketAddress;
-/**
- */
@InterfaceAudience.Private
public interface RpcServer {
+ // TODO: Needs cleanup. Why a 'start', and then a 'startThreads' and an 'openServer'?
+ // Also, the call takes a RpcRequestBody, an already composed combination of
+ // rpc Request and metadata. Should disentangle metadata and rpc Request Message.
void setSocketSendBufSize(int size);
@@ -45,12 +47,12 @@ public interface RpcServer {
InetSocketAddress getListenerAddress();
/** Called for each call.
- * @param param writable parameter
+ * @param param parameter
* @param receiveTime time
- * @return Message
+ * @return Message Protobuf response Message
* @throws java.io.IOException e
*/
- Message call(Class<? extends VersionedProtocol> protocol,
+ Message call(Class<? extends IpcProtocol> protocol,
RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
throws IOException;
@@ -62,9 +64,8 @@ public interface RpcServer {
void startThreads();
-
/**
* Returns the metrics instance for reporting RPC call statistics
*/
MetricsHBaseServer getMetrics();
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java Wed Feb 13 20:58:23 2013
@@ -121,4 +121,13 @@ public class ServerRpcController impleme
public boolean failedOnException() {
return serviceException != null;
}
+
+ /**
+ * Throws an IOException back out if one is currently stored.
+ */
+ public void checkFailed() throws IOException {
+ if (failedOnException()) {
+ throw getFailedOn();
+ }
+ }
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java Wed Feb 13 20:58:23 2013
@@ -106,7 +106,20 @@ public class TableSplit implements Input
Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
}
+ @Override
public int compareTo(TableSplit o) {
return Bytes.compareTo(getStartRow(), o.getStartRow());
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof TableSplit)) {
+ return false;
+ }
+ TableSplit other = (TableSplit)o;
+ return Bytes.equals(m_tableName, other.m_tableName) &&
+ Bytes.equals(m_startRow, other.m_startRow) &&
+ Bytes.equals(m_endRow, other.m_endRow) &&
+ m_regionLocation.equals(other.m_regionLocation);
+ }
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Wed Feb 13 20:58:23 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -27,19 +29,18 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* Export an HBase table.
@@ -137,10 +138,10 @@ public class Export {
int batching = conf.getInt(EXPORT_BATCHING, -1);
if (batching != -1){
- try{
+ try {
s.setBatch(batching);
- } catch (RuntimeException e) {
- LOG.error("Batching could not be set", e);
+ } catch (IncompatibleFilterException e) {
+ LOG.error("Batching could not be set", e);
}
}
LOG.info("versions=" + versions + ", starttime=" + startTime +
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Wed Feb 13 20:58:23 2013
@@ -49,17 +49,18 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -82,7 +83,7 @@ import org.apache.hadoop.mapreduce.lib.p
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
- TimeRangeTracker trt = new TimeRangeTracker();
+ private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
private static final String DATABLOCK_ENCODING_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
@@ -106,6 +107,7 @@ public class HFileOutputFormat extends F
// create a map from column family to the compression algorithm
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+ final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
final HFileDataBlockEncoder encoder;
@@ -166,7 +168,6 @@ public class HFileOutputFormat extends F
// we now have the proper HLog writer. full steam ahead
kv.updateLatestStamp(this.now);
- trt.includeTimestamp(kv);
wl.writer.append(kv);
wl.written += length;
@@ -187,9 +188,9 @@ public class HFileOutputFormat extends F
this.rollRequested = false;
}
- /* Create a new HFile.Writer.
+ /* Create a new StoreFile.Writer.
* @param family
- * @return A WriterLength, containing a new HFile.Writer.
+ * @return A WriterLength, containing a new StoreFile.Writer.
* @throws IOException
*/
private WriterLength getNewWriter(byte[] family, Configuration conf)
@@ -198,20 +199,28 @@ public class HFileOutputFormat extends F
Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
- wl.writer = HFile.getWriterFactoryNoCache(conf)
- .withPath(fs, StoreFile.getUniqueFile(fs, familydir))
- .withBlockSize(blocksize)
- .withCompression(compression)
- .withComparator(KeyValue.KEY_COMPARATOR)
+ String bloomTypeStr = bloomTypeMap.get(family);
+ BloomType bloomType = BloomType.NONE;
+ if (bloomTypeStr != null) {
+ bloomType = BloomType.valueOf(bloomTypeStr);
+ }
+ Configuration tempConf = new Configuration(conf);
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+ wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
+ .withOutputDir(familydir)
+ .withCompression(AbstractHFileWriter.compressionByName(compression))
+ .withBloomType(bloomType)
+ .withComparator(KeyValue.COMPARATOR)
.withDataBlockEncoder(encoder)
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
- .create();
+ .build();
+
this.writers.put(family, wl);
return wl;
}
- private void close(final HFile.Writer w) throws IOException {
+ private void close(final StoreFile.Writer w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
@@ -221,8 +230,7 @@ public class HFileOutputFormat extends F
Bytes.toBytes(true));
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude));
- w.appendFileInfo(StoreFile.TIMERANGE_KEY,
- WritableUtils.toByteArray(trt));
+ w.appendTrackedTimestampsToMetadata();
w.close();
}
}
@@ -241,7 +249,7 @@ public class HFileOutputFormat extends F
*/
static class WriterLength {
long written = 0;
- HFile.Writer writer = null;
+ StoreFile.Writer writer = null;
}
/**
@@ -359,7 +367,8 @@ public class HFileOutputFormat extends F
// Set compression algorithms based on column families
configureCompression(table, conf);
-
+ configureBloomType(table, conf);
+
TableMapReduceUtil.addDependencyJars(job);
LOG.info("Incremental table output configured.");
}
@@ -375,25 +384,39 @@ public class HFileOutputFormat extends F
* algorithm
*/
static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
- Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
- String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
- for (String familyConf : compressionConf.split("&")) {
+ return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
+ }
+
+ private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
+ return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
+ }
+
+ /**
+ * Run inside the task to deserialize column family to given conf value map.
+ *
+ * @param conf
+ * @param confName
+ * @return a map of column family to the given configuration value
+ */
+ private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
+ Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+ String confVal = conf.get(confName, "");
+ for (String familyConf : confVal.split("&")) {
String[] familySplit = familyConf.split("=");
if (familySplit.length != 2) {
continue;
}
-
try {
- compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+ confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
URLDecoder.decode(familySplit[1], "UTF-8"));
} catch (UnsupportedEncodingException e) {
// will not happen with UTF-8 encoding
throw new AssertionError(e);
}
}
- return compressionMap;
+ return confValMap;
}
-
+
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
@@ -403,6 +426,8 @@ public class HFileOutputFormat extends F
* @throws IOException
* on failure to read column family descriptors
*/
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
static void configureCompression(HTable table, Configuration conf) throws IOException {
StringBuilder compressionConfigValue = new StringBuilder();
HTableDescriptor tableDescriptor = table.getTableDescriptor();
@@ -423,4 +448,35 @@ public class HFileOutputFormat extends F
// Get rid of the last ampersand
conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
}
+
+ /**
+ * Serialize column family to bloom type map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ static void configureBloomType(HTable table, Configuration conf) throws IOException {
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ return;
+ }
+ StringBuilder bloomTypeConfigValue = new StringBuilder();
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ bloomTypeConfigValue.append('&');
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+ bloomTypeConfigValue.append('=');
+ String bloomType = familyDescriptor.getBloomFilterType().toString();
+ if (bloomType == null) {
+ bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+ }
+ conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+ }
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java Wed Feb 13 20:58:23 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -49,10 +48,10 @@ import org.apache.hadoop.mapreduce.TaskA
*/
@InterfaceAudience.Public
public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
- private static Log LOG = LogFactory.getLog(HLogInputFormat.class);
+ private static final Log LOG = LogFactory.getLog(HLogInputFormat.class);
- public static String START_TIME_KEY = "hlog.start.time";
- public static String END_TIME_KEY = "hlog.end.time";
+ public static final String START_TIME_KEY = "hlog.start.time";
+ public static final String END_TIME_KEY = "hlog.end.time";
/**
* {@link InputSplit} for {@link HLog} files. Each split represent
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Feb 13 20:58:23 2013
@@ -19,27 +19,41 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
/**
* Import data written by {@link Export}.
@@ -47,9 +61,15 @@ import org.apache.hadoop.util.GenericOpt
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Import {
+ private static final Log LOG = LogFactory.getLog(Import.class);
final static String NAME = "import";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+ final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+ final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+
+ // Optional filter to use for mappers
+ private static Filter filter;
/**
* A mapper that just writes out KeyValues.
@@ -72,6 +92,10 @@ public class Import {
throws IOException {
try {
for (KeyValue kv : value.raw()) {
+ kv = filterKv(kv);
+ // skip if we filtered it out
+ if (kv == null) continue;
+
context.write(row, convertKv(kv, cfRenameMap));
}
} catch (InterruptedException e) {
@@ -82,6 +106,7 @@ public class Import {
@Override
public void setup(Context context) {
cfRenameMap = createCfRenameMap(context.getConfiguration());
+ filter = instantiateFilter(context.getConfiguration());
}
}
@@ -91,6 +116,7 @@ public class Import {
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
+ private UUID clusterId;
/**
* @param row The current table row key.
@@ -116,6 +142,10 @@ public class Import {
Put put = null;
Delete delete = null;
for (KeyValue kv : result.raw()) {
+ kv = filterKv(kv);
+ // skip if we filter it out
+ if (kv == null) continue;
+
kv = convertKv(kv, cfRenameMap);
// Deletes and Puts are gathered and written when finished
if (kv.isDelete()) {
@@ -131,17 +161,106 @@ public class Import {
}
}
if (put != null) {
+ put.setClusterId(clusterId);
context.write(key, put);
}
if (delete != null) {
+ delete.setClusterId(clusterId);
context.write(key, delete);
}
}
@Override
public void setup(Context context) {
- cfRenameMap = createCfRenameMap(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
+ cfRenameMap = createCfRenameMap(conf);
+ filter = instantiateFilter(conf);
+
+ try {
+ HConnection connection = HConnectionManager.getConnection(conf);
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+ clusterId = zkHelper.getUUIDForCluster(zkw);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.error("Problem connecting to ZooKeper during task setup", e);
+ } catch (KeeperException e) {
+ LOG.error("Problem reading ZooKeeper data during task setup", e);
+ } catch (IOException e) {
+ LOG.error("Problem setting up task", e);
+ }
+
+ }
+ }
+
+ /**
+ * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+ * optionally not include in the job output
+ * @param conf {@link Configuration} from which to load the filter
+ * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+ * @throws IllegalArgumentException if the filter is misconfigured
+ */
+ private static Filter instantiateFilter(Configuration conf) {
+ // get the filter, if it was configured
+ Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filterClass == null) {
+ LOG.debug("No configured filter class, accepting all keyvalues.");
+ return null;
+ }
+ LOG.debug("Attempting to create filter:" + filterClass);
+
+ try {
+ Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+ return (Filter) m.invoke(null, getFilterArgs(conf));
+ } catch (IllegalAccessException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (SecurityException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
+ ArrayList<byte[]> args = new ArrayList<byte[]>();
+ String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+ for (String arg : sargs) {
+ // all the filters' instantiation methods expected quoted args since they are coming from
+ // the shell, so add them here, though its shouldn't really be needed :-/
+ args.add(Bytes.toBytes("'" + arg + "'"));
+ }
+ return args;
+ }
+
+ /**
+ * Attempt to filter out the keyvalue
+ * @param kv {@link KeyValue} on which to apply the filter
+ * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+ * {@link KeyValue}
+ */
+ private static KeyValue filterKv(KeyValue kv) {
+ // apply the filter and skip this kv if the filter doesn't apply
+ if (filter != null) {
+ Filter.ReturnCode code = filter.filterKeyValue(kv);
+ System.out.println("Filter returned:" + code);
+ // if its not an accept type, then skip this kv
+ if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+ .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+ if (LOG.isDebugEnabled()) {
+ System.out.println("Skipping key: " + kv + " from filter decision: " + code);
+ }
+ return null;
+ }
}
+ return kv;
}
// helper: create a new KeyValue based on CF rename map
@@ -223,13 +342,33 @@ public class Import {
}
conf.set(CF_RENAME_PROP, sb.toString());
}
-
+
+ /**
+ * Add a Filter to be instantiated on import
+ * @param conf Configuration to update (will be passed to the job)
+ * @param clazz {@link Filter} subclass to instantiate on the server.
+ * @param args List of arguments to pass to the filter on instantiation
+ */
+ public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+ List<String> args) {
+ conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+
+ // build the param string for the key
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < args.size(); i++) {
+ String arg = args.get(i);
+ builder.append(arg);
+ if (i != args.size() - 1) {
+ builder.append(",");
+ }
+ }
+ conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+ }
/**
* Sets up the actual job.
- *
- * @param conf The current configuration.
- * @param args The command line parameters.
+ * @param conf The current configuration.
+ * @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
@@ -242,6 +381,17 @@ public class Import {
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+ // make sure we get the filter in the jars
+ try {
+ Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filter != null) {
+ TableMapReduceUtil.addDependencyJars(conf, filter);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
HTable table = new HTable(conf, tableName);
@@ -274,6 +424,15 @@ public class Import {
System.err.println("By default Import will load data directly into HBase. To instead generate");
System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err
+ .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+ System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+ System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+ System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+ + CF_RENAME_PROP + " property. Futher, filters will only use the"
+ + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+ + "the KeyValue.");
System.err.println("For performance consider the following options:\n"
+ " -Dmapred.map.tasks.speculative.execution=false\n"
+ " -Dmapred.reduce.tasks.speculative.execution=false");
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Feb 13 20:58:23 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.HB
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -71,11 +72,13 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -91,19 +94,30 @@ import com.google.common.util.concurrent
@InterfaceAudience.Public
@InterfaceStability.Stable
public class LoadIncrementalHFiles extends Configured implements Tool {
- private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
- static AtomicLong regionCount = new AtomicLong(0);
+ private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+ static final AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin;
private Configuration cfg;
- public static String NAME = "completebulkload";
- private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+ public static final String NAME = "completebulkload";
+ private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
private boolean assignSeqIds;
- public LoadIncrementalHFiles(Configuration conf) throws Exception {
+ private boolean useSecure;
+ private Token<?> userToken;
+ private String bulkToken;
+
+ //package private for testing
+ LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws Exception {
super(conf);
this.cfg = conf;
this.hbAdmin = new HBaseAdmin(conf);
+ //added simple for testing
+ this.useSecure = useSecure != null ? useSecure : User.isHBaseSecurityEnabled(conf);
+ }
+
+ public LoadIncrementalHFiles(Configuration conf) throws Exception {
+ this(conf, null);
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
}
@@ -215,6 +229,18 @@ public class LoadIncrementalHFiles exten
return;
}
+ //If using secure bulk load
+ //prepare staging directory and token
+ if(useSecure) {
+ FileSystem fs = FileSystem.get(cfg);
+ //This condition is here for unit testing
+ //Since delegation token doesn't work in mini cluster
+ if(User.isSecurityEnabled()) {
+ userToken = fs.getDelegationToken("renewer");
+ }
+ bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getTableName());
+ }
+
// Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) {
// need to reload split keys each iteration.
@@ -243,6 +269,18 @@ public class LoadIncrementalHFiles exten
}
} finally {
+ if(useSecure) {
+ if(userToken != null) {
+ try {
+ userToken.cancel(cfg);
+ } catch (Exception e) {
+ LOG.warn("Failed to cancel HDFS delegation token.", e);
+ }
+ }
+ if(bulkToken != null) {
+ new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
+ }
+ }
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
StringBuilder err = new StringBuilder();
@@ -476,11 +514,47 @@ public class LoadIncrementalHFiles exten
tableName, first) {
@Override
public Boolean call() throws Exception {
- LOG.debug("Going to connect to server " + location + " for row "
- + Bytes.toStringBinary(row));
- byte[] regionName = location.getRegionInfo().getRegionName();
- return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName,
- assignSeqIds);
+ SecureBulkLoadClient secureClient = null;
+ boolean success = false;
+
+ try {
+ LOG.debug("Going to connect to server " + location + " for row "
+ + Bytes.toStringBinary(row));
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ if(!useSecure) {
+ success = ProtobufUtil.bulkLoadHFile(server, famPaths, regionName, assignSeqIds);
+ } else {
+ HTable table = new HTable(conn.getConfiguration(), tableName);
+ secureClient = new SecureBulkLoadClient(table);
+ success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey());
+ }
+ return success;
+ } finally {
+ //Best effort copying of files that might not have been imported
+ //from the staging directory back to original location
+ //in user directory
+ if(secureClient != null && !success) {
+ FileSystem fs = FileSystem.get(cfg);
+ for(Pair<byte[], String> el : famPaths) {
+ Path hfileStagingPath = null;
+ Path hfileOrigPath = new Path(el.getSecond());
+ try {
+ hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
+ hfileOrigPath.getName());
+ if(fs.rename(hfileStagingPath, hfileOrigPath)) {
+ LOG.debug("Moved back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ } else if(fs.exists(hfileStagingPath)){
+ LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ }
+ } catch(Exception ex) {
+ LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+ hfileStagingPath, ex);
+ }
+ }
+ }
+ }
}
};
@@ -626,11 +700,11 @@ public class LoadIncrementalHFiles exten
}
HTableDescriptor htd = new HTableDescriptor(tableName);
- HColumnDescriptor hcd = null;
+ HColumnDescriptor hcd;
// Add column families
// Build a set of keys
- byte[][] keys = null;
+ byte[][] keys;
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (FileStatus stat : familyDirStatuses) {
@@ -667,10 +741,10 @@ public class LoadIncrementalHFiles exten
" last=" + Bytes.toStringBinary(last));
// To eventually infer start key-end key boundaries
- Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+ Integer value = map.containsKey(first)? map.get(first):0;
map.put(first, value+1);
- value = map.containsKey(last)?(Integer)map.get(last):0;
+ value = map.containsKey(last)? map.get(last):0;
map.put(last, value-1);
} finally {
reader.close();
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Wed Feb 13 20:58:23 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.Immuta
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -219,7 +220,8 @@ extends InputFormat<ImmutableBytesWritab
private String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
- hostName = DNS.reverseDns(ipAddress, this.nameServer);
+ hostName = Strings.domainNamePointerToHostName(
+ DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Wed Feb 13 20:58:23 2013
@@ -126,7 +126,7 @@ public class WALPlayer extends Configure
Delete del = null;
KeyValue lastKV = null;
for (KeyValue kv : value.getKeyValues()) {
- // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
+ // filtering HLog meta entries
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed Feb 13 20:58:23 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import com.google.common.collect.LinkedHashMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,6 +83,8 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
+import com.google.common.collect.LinkedHashMultimap;
+
/**
* Manages and performs region assignment.
* <p>
@@ -162,8 +162,10 @@ public class AssignmentManager extends Z
* that ServerShutdownHandler can be fully enabled and re-assign regions
* of dead servers. So that when re-assignment happens, AssignmentManager
* has proper region states.
+ *
+ * Protected to ease testing.
*/
- final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
+ protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
/**
* Constructs a new assignment manager.
@@ -610,7 +612,7 @@ public class AssignmentManager extends Z
*/
private void handleRegion(final RegionTransition rt, int expectedVersion) {
if (rt == null) {
- LOG.warn("Unexpected NULL input " + rt);
+ LOG.warn("Unexpected NULL input for RegionTransition rt");
return;
}
final ServerName sn = rt.getServerName();
@@ -1059,13 +1061,27 @@ public class AssignmentManager extends Z
ZKUtil.listChildrenAndWatchForNewChildren(
watcher, watcher.assignmentZNode);
if (children != null) {
+ Stat stat = new Stat();
for (String child : children) {
// if region is in transition, we already have a watch
// on it, so no need to watch it again. So, as I know for now,
// this is needed to watch splitting nodes only.
if (!regionStates.isRegionInTransition(child)) {
- ZKUtil.watchAndCheckExists(watcher,
- ZKUtil.joinZNode(watcher.assignmentZNode, child));
+ stat.setVersion(0);
+ byte[] data = ZKAssign.getDataAndWatch(watcher,
+ ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
+ if (data != null && stat.getVersion() > 0) {
+ try {
+ RegionTransition rt = RegionTransition.parseFrom(data);
+
+ //See HBASE-7551, handle splitting too, in case we miss the node change event
+ if (rt.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
+ handleRegion(rt, stat.getVersion());
+ }
+ } catch (DeserializationException de) {
+ LOG.error("error getting data for " + child, de);
+ }
+ }
}
}
}
@@ -1461,6 +1477,7 @@ public class AssignmentManager extends Z
return;
}
// This never happens. Currently regionserver close always return true.
+ // Todo; this can now happen (0.96) if there is an exception in a coprocessor
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
region.getRegionNameAsString());
} catch (Throwable t) {
@@ -2633,11 +2650,11 @@ public class AssignmentManager extends Z
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
}
- boolean isCarryingRoot(ServerName serverName) {
+ public boolean isCarryingRoot(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
}
- boolean isCarryingMeta(ServerName serverName) {
+ public boolean isCarryingMeta(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Wed Feb 13 20:58:23 2013
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ServerNam
/**
* Run bulk assign. Does one RCP per regionserver passing a
- * batch of regions using {@link SingleServerBulkAssigner}.
+ * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}.
*/
@InterfaceAudience.Private
public class GeneralBulkAssigner extends BulkAssigner {
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java Wed Feb 13 20:58:23 2013
@@ -64,6 +64,7 @@ public class MasterCoprocessorHost
private MasterServices masterServices;
MasterCoprocessorHost(final MasterServices services, final Configuration conf) {
+ this.conf = conf;
this.masterServices = services;
loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Wed Feb 13 20:58:23 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import com.google.protobuf.Service;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -80,6 +81,63 @@ public interface MasterServices extends
throws IOException;
/**
+ * Delete a table
+ * @param tableName The table name
+ * @throws IOException
+ */
+ public void deleteTable(final byte[] tableName) throws IOException;
+
+ /**
+ * Modify the descriptor of an existing table
+ * @param tableName The table name
+ * @param descriptor The updated table descriptor
+ * @throws IOException
+ */
+ public void modifyTable(final byte[] tableName, final HTableDescriptor descriptor)
+ throws IOException;
+
+ /**
+ * Enable an existing table
+ * @param tableName The table name
+ * @throws IOException
+ */
+ public void enableTable(final byte[] tableName) throws IOException;
+
+ /**
+ * Disable an existing table
+ * @param tableName The table name
+ * @throws IOException
+ */
+ public void disableTable(final byte[] tableName) throws IOException;
+
+ /**
+ * Add a new column to an existing table
+ * @param tableName The table name
+ * @param column The column definition
+ * @throws IOException
+ */
+ public void addColumn(final byte[] tableName, final HColumnDescriptor column)
+ throws IOException;
+
+ /**
+ * Modify the column descriptor of an existing column in an existing table
+ * @param tableName The table name
+ * @param descriptor The updated column definition
+ * @throws IOException
+ */
+ public void modifyColumn(byte[] tableName, HColumnDescriptor descriptor)
+ throws IOException;
+
+ /**
+ * Delete a column from an existing table
+ * @param tableName The table name
+ * @param columnName The column name
+ * @throws IOException
+ */
+ public void deleteColumn(final byte[] tableName, final byte[] columnName)
+ throws IOException;
+
+ /**
* @return Return table descriptors implementation.
*/
public TableDescriptors getTableDescriptors();
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java Wed Feb 13 20:58:23 2013
@@ -117,6 +117,18 @@ public class RegionPlan implements Compa
}
@Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ RegionPlan other = (RegionPlan) obj;
+ return compareTo(other) == 0;
+ }
+
+ @Override
public String toString() {
return "hri=" + this.hri.getRegionNameAsString() + ", src=" +
(this.source == null? "": this.source.toString()) +
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Wed Feb 13 20:58:23 2013
@@ -380,7 +380,7 @@ public class ServerManager {
public double getAverageLoad() {
int totalLoad = 0;
int numServers = 0;
- double averageLoad = 0.0;
+ double averageLoad;
for (ServerLoad sl: this.onlineServers.values()) {
numServers++;
totalLoad += sl.getNumberOfRegions();
@@ -680,7 +680,7 @@ public class ServerManager {
*/
private AdminProtocol getServerConnection(final ServerName sn)
throws IOException {
- AdminProtocol admin = this.serverConnections.get(sn.toString());
+ AdminProtocol admin = this.serverConnections.get(sn);
if (admin == null) {
LOG.debug("New connection to " + sn.toString());
admin = this.connection.getAdmin(sn.getHostname(), sn.getPort());
@@ -886,7 +886,7 @@ public class ServerManager {
* To clear any dead server with same host name and port of any online server
*/
void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
- ServerName sn = null;
+ ServerName sn;
for (ServerName serverName : getOnlineServersList()) {
while ((sn = ServerName.
findServerWithSameHostnamePort(this.deadservers, serverName)) != null) {
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed Feb 13 20:58:23 2013
@@ -26,6 +26,7 @@ import static org.apache.hadoop.hbase.ma
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,10 +41,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
@@ -123,6 +125,8 @@ public class SplitLogManager extends Zoo
private volatile Set<ServerName> deadWorkers = null;
private final Object deadWorkersLock = new Object();
+ private Set<String> failedDeletions = null;
+
/**
* Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
* Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)}
@@ -180,6 +184,8 @@ public class SplitLogManager extends Zoo
this.serverName = serverName;
this.timeoutMonitor =
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
+
+ this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
}
public void finishInitialization(boolean masterRecovery) {
@@ -194,7 +200,7 @@ public class SplitLogManager extends Zoo
}
}
- private FileStatus[] getFileList(List<Path> logDirs) throws IOException {
+ private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
List<FileStatus> fileStatus = new ArrayList<FileStatus>();
for (Path hLogDir : logDirs) {
this.fs = hLogDir.getFileSystem(conf);
@@ -202,8 +208,7 @@ public class SplitLogManager extends Zoo
LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
continue;
}
- // TODO filter filenames?
- FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, null);
+ FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
if (logfiles == null || logfiles.length == 0) {
LOG.info(hLogDir + " is empty dir, no logs to split");
} else {
@@ -228,6 +233,7 @@ public class SplitLogManager extends Zoo
logDirs.add(logDir);
return splitLogDistributed(logDirs);
}
+
/**
* The caller will block until all the log files of the given region server
* have been processed - successfully split or an error is encountered - by an
@@ -239,9 +245,25 @@ public class SplitLogManager extends Zoo
* @return cumulative size of the logfiles split
*/
public long splitLogDistributed(final List<Path> logDirs) throws IOException {
+ return splitLogDistributed(logDirs, null);
+ }
+
+ /**
+ * The caller will block until all the META log files of the given region server
+ * have been processed - successfully split or an error is encountered - by an
+ * available worker region server. This method must only be called after the
+ * region servers have been brought online.
+ *
+ * @param logDirs List of log dirs to split
+ * @param filter the Path filter to select specific files for considering
+ * @throws IOException If there was an error while splitting any log file
+ * @return cumulative size of the logfiles split
+ */
+ public long splitLogDistributed(final List<Path> logDirs, PathFilter filter)
+ throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Doing distributed log split in " + logDirs);
- FileStatus[] logfiles = getFileList(logDirs);
+ FileStatus[] logfiles = getFileList(logDirs, filter);
status.setStatus("Checking directory contents...");
LOG.debug("Scheduling batch of logs to split");
SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
@@ -418,11 +440,12 @@ public class SplitLogManager extends Zoo
}
}
}
- // delete the task node in zk. Keep trying indefinitely - its an async
+ // delete the task node in zk. It's an async
// call and no one is blocked waiting for this node to be deleted. All
// task names are unique (log.<timestamp>) there is no risk of deleting
// a future task.
- deleteNode(path, Long.MAX_VALUE);
+ // if a deletion fails, TimeoutMonitor will retry the same deletion later
+ deleteNode(path, zkretries);
return;
}
@@ -531,6 +554,21 @@ public class SplitLogManager extends Zoo
}
}
+ /**
+ * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
+ * @param statusCode integer value of a ZooKeeper exception code
+ * @param action description message about the retried action
+ * @return true when need to abandon retries otherwise false
+ */
+ private boolean needAbandonRetries(int statusCode, String action) {
+ if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
+ LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
+ + "action=" + action);
+ return true;
+ }
+ return false;
+ }
+
private void heartbeat(String path, int new_version, ServerName workerName) {
Task task = findOrCreateOrphanTask(path);
if (new_version != task.last_version) {
@@ -662,8 +700,7 @@ public class SplitLogManager extends Zoo
}
private void deleteNodeFailure(String path) {
- LOG.fatal("logic failure, failing to delete a node should never happen " +
- "because delete has infinite retries");
+ LOG.info("Failed to delete node " + path + " and will retry soon.");
return;
}
@@ -847,7 +884,7 @@ public class SplitLogManager extends Zoo
volatile long last_update;
volatile int last_version;
volatile ServerName cur_worker_name;
- TaskBatch batch;
+ volatile TaskBatch batch;
volatile TerminationStatus status;
volatile int incarnation;
volatile int unforcedResubmits;
@@ -1005,6 +1042,16 @@ public class SplitLogManager extends Zoo
SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
LOG.debug("resubmitting unassigned task(s) after timeout");
}
+
+ // Retry previously failed deletes
+ if (failedDeletions.size() > 0) {
+ List<String> tmpPaths = new ArrayList<String>(failedDeletions);
+ for (String tmpPath : tmpPaths) {
+ // deleteNode is an async call
+ deleteNode(tmpPath, zkretries);
+ }
+ failedDeletions.removeAll(tmpPaths);
+ }
}
}
@@ -1019,6 +1066,10 @@ public class SplitLogManager extends Zoo
public void processResult(int rc, String path, Object ctx, String name) {
SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
if (rc != 0) {
+ if (needAbandonRetries(rc, "Create znode " + path)) {
+ createNodeFailure(path);
+ return;
+ }
if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
// What if there is a delete pending against this pre-existing
// znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
@@ -1058,8 +1109,7 @@ public class SplitLogManager extends Zoo
Stat stat) {
SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
if (rc != 0) {
- if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) {
- LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries.");
+ if (needAbandonRetries(rc, "GetData from znode " + path)) {
return;
}
if (rc == KeeperException.Code.NONODE.intValue()) {
@@ -1113,6 +1163,10 @@ public class SplitLogManager extends Zoo
public void processResult(int rc, String path, Object ctx) {
SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
if (rc != 0) {
+ if (needAbandonRetries(rc, "Delete znode " + path)) {
+ failedDeletions.add(path);
+ return;
+ }
if (rc != KeeperException.Code.NONODE.intValue()) {
SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
Long retry_count = (Long) ctx;
@@ -1120,13 +1174,14 @@ public class SplitLogManager extends Zoo
path + " remaining retries=" + retry_count);
if (retry_count == 0) {
LOG.warn("delete failed " + path);
+ failedDeletions.add(path);
deleteNodeFailure(path);
} else {
deleteNode(path, retry_count - 1);
}
return;
} else {
- LOG.debug(path +
+ LOG.info(path +
" does not exist. Either was created but deleted behind our" +
" back by another pending delete OR was deleted" +
" in earlier retry rounds. zkretries = " + (Long) ctx);
@@ -1151,8 +1206,7 @@ public class SplitLogManager extends Zoo
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc != 0) {
- if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) {
- LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries.");
+ if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
return;
}
Long retry_count = (Long)ctx;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java Wed Feb 13 20:58:23 2013
@@ -145,13 +145,23 @@ public abstract class CleanerChore<T ext
* @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
* @throws IOException if there is an unexpected filesystem error
*/
- private boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
+ public boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Checking directory: " + toCheck);
}
FileStatus[] children = FSUtils.listStatus(fs, toCheck);
// if the directory doesn't exist, then we are done
- if (children == null) return true;
+ if (children == null) {
+ try {
+ return fs.delete(toCheck, false);
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Couldn't delete directory: " + toCheck, e);
+ }
+ }
+ // couldn't delete w/o exception, so we can't return success.
+ return false;
+ }
boolean canDeleteThis = true;
for (FileStatus child : children) {
@@ -168,9 +178,22 @@ public abstract class CleanerChore<T ext
}
}
- // if all the children have been deleted, then we should try to delete this directory. However,
- // don't do so recursively so we don't delete files that have been added since we checked.
- return canDeleteThis ? fs.delete(toCheck, false) : false;
+ // if the directory has children, we can't delete it, so we are done
+ if (!canDeleteThis) return false;
+
+ // otherwise, all the children (that we know about) have been deleted, so we should try to
+ // delete this directory. However, don't do so recursively so we don't delete files that have
+ // been added since we last checked.
+ try {
+ return fs.delete(toCheck, false);
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Couldn't delete directory: " + toCheck, e);
+ }
+ }
+
+ // couldn't delete w/o exception, so we can't return success.
+ return false;
}
/**
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java Wed Feb 13 20:58:23 2013
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.util.Envi
/**
* HFile cleaner that uses the timestamp of the hfile to determine if it should be deleted. By
- * default they are allowed to live for {@value TimeToLiveHFileCleaner#DEFAULT_TTL}
+ * default they are allowed to live for {@value #DEFAULT_TTL}
*/
@InterfaceAudience.Private
public class TimeToLiveHFileCleaner extends BaseHFileCleanerDelegate {
@@ -38,7 +38,7 @@ public class TimeToLiveHFileCleaner exte
public static final Log LOG = LogFactory.getLog(TimeToLiveHFileCleaner.class.getName());
public static final String TTL_CONF_KEY = "hbase.master.hfilecleaner.ttl";
// default ttl = 5 minutes
- private static final long DEFAULT_TTL = 60000 * 5;
+ public static final long DEFAULT_TTL = 60000 * 5;
// Configured time a hfile can be kept after it was moved to the archive
private long ttl;
private FileSystem fs;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java Wed Feb 13 20:58:23 2013
@@ -97,7 +97,7 @@ public class DeleteTableHandler extends
FileSystem fs = mfs.getFileSystem();
for (HRegionInfo hri: regions) {
LOG.debug("Deleting region " + hri.getRegionNameAsString() + " from FS");
- HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
+ HFileArchiver.archiveRegion(masterServices.getConfiguration(), fs, mfs.getRootDir(),
tempTableDir, new Path(tempTableDir, hri.getEncodedName()));
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java Wed Feb 13 20:58:23 2013
@@ -18,11 +18,17 @@
*/
package org.apache.hadoop.hbase.master.handler;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.zookeeper.KeeperException;
/**
* Shutdown handler for the server hosting <code>-ROOT-</code>,
@@ -32,7 +38,7 @@ import org.apache.hadoop.hbase.master.Ma
public class MetaServerShutdownHandler extends ServerShutdownHandler {
private final boolean carryingRoot;
private final boolean carryingMeta;
-
+ private static final Log LOG = LogFactory.getLog(MetaServerShutdownHandler.class);
public MetaServerShutdownHandler(final Server server,
final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
@@ -44,11 +50,118 @@ public class MetaServerShutdownHandler e
}
@Override
+ public void process() throws IOException {
+ try {
+ LOG.info("Splitting META logs for " + serverName);
+ if (this.shouldSplitHlog) {
+ this.services.getMasterFileSystem().splitMetaLog(serverName);
+ }
+ } catch (IOException ioe) {
+ this.services.getExecutorService().submit(this);
+ this.deadServers.add(serverName);
+ throw new IOException("failed log splitting for " +
+ serverName + ", will retry", ioe);
+ }
+
+ // Assign root and meta if we were carrying them.
+ if (isCarryingRoot()) { // -ROOT-
+ // Check again: region may be assigned to other where because of RIT
+ // timeout
+ if (this.services.getAssignmentManager().isCarryingRoot(serverName)) {
+ LOG.info("Server " + serverName
+ + " was carrying ROOT. Trying to assign.");
+ this.services.getAssignmentManager().regionOffline(
+ HRegionInfo.ROOT_REGIONINFO);
+ verifyAndAssignRootWithRetries();
+ } else {
+ LOG.info("ROOT has been assigned to otherwhere, skip assigning.");
+ }
+ }
+
+ // Carrying meta?
+ if (isCarryingMeta()) {
+ // Check again: region may be assigned to other where because of RIT
+ // timeout
+ if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
+ LOG.info("Server " + serverName
+ + " was carrying META. Trying to assign.");
+ this.services.getAssignmentManager().regionOffline(
+ HRegionInfo.FIRST_META_REGIONINFO);
+ this.services.getAssignmentManager().assignMeta();
+ } else {
+ LOG.info("META has been assigned to otherwhere, skip assigning.");
+ }
+
+ }
+ super.process();
+ }
+ /**
+ * Before assign the ROOT region, ensure it haven't
+ * been assigned by other place
+ * <p>
+ * Under some scenarios, the ROOT region can be opened twice, so it seemed online
+ * in two regionserver at the same time.
+ * If the ROOT region has been assigned, so the operation can be canceled.
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws KeeperException
+ */
+ private void verifyAndAssignRoot()
+ throws InterruptedException, IOException, KeeperException {
+ long timeout = this.server.getConfiguration().
+ getLong("hbase.catalog.verification.timeout", 1000);
+ if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
+ this.services.getAssignmentManager().assignRoot();
+ } else if (serverName.equals(server.getCatalogTracker().getRootLocation())) {
+ throw new IOException("-ROOT- is onlined on the dead server "
+ + serverName);
+ } else {
+ LOG.info("Skip assigning -ROOT-, because it is online on the "
+ + server.getCatalogTracker().getRootLocation());
+ }
+ }
+
+ /**
+ * Failed many times, shutdown processing
+ * @throws IOException
+ */
+ private void verifyAndAssignRootWithRetries() throws IOException {
+ int iTimes = this.server.getConfiguration().getInt(
+ "hbase.catalog.verification.retries", 10);
+
+ long waitTime = this.server.getConfiguration().getLong(
+ "hbase.catalog.verification.timeout", 1000);
+
+ int iFlag = 0;
+ while (true) {
+ try {
+ verifyAndAssignRoot();
+ break;
+ } catch (KeeperException e) {
+ this.server.abort("In server shutdown processing, assigning root", e);
+ throw new IOException("Aborting", e);
+ } catch (Exception e) {
+ if (iFlag >= iTimes) {
+ this.server.abort("verifyAndAssignRoot failed after" + iTimes
+ + " times retries, aborting", e);
+ throw new IOException("Aborting", e);
+ }
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e1) {
+ LOG.warn("Interrupted when is the thread sleep", e1);
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted", e1);
+ }
+ iFlag++;
+ }
+ }
+ }
+
boolean isCarryingRoot() {
return this.carryingRoot;
}
- @Override
boolean isCarryingMeta() {
return this.carryingMeta;
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Wed Feb 13 20:58:23 2013
@@ -55,10 +55,10 @@ import org.apache.zookeeper.KeeperExcept
@InterfaceAudience.Private
public class ServerShutdownHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
- private final ServerName serverName;
- private final MasterServices services;
- private final DeadServer deadServers;
- private final boolean shouldSplitHlog; // whether to split HLog or not
+ protected final ServerName serverName;
+ protected final MasterServices services;
+ protected final DeadServer deadServers;
+ protected final boolean shouldSplitHlog; // whether to split HLog or not
public ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
@@ -91,63 +91,6 @@ public class ServerShutdownHandler exten
}
/**
- * Before assign the ROOT region, ensure it haven't
- * been assigned by other place
- * <p>
- * Under some scenarios, the ROOT region can be opened twice, so it seemed online
- * in two regionserver at the same time.
- * If the ROOT region has been assigned, so the operation can be canceled.
- * @throws InterruptedException
- * @throws IOException
- * @throws KeeperException
- */
- private void verifyAndAssignRoot()
- throws InterruptedException, IOException, KeeperException {
- long timeout = this.server.getConfiguration().
- getLong("hbase.catalog.verification.timeout", 1000);
- if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
- this.services.getAssignmentManager().assignRoot();
- }
- }
-
- /**
- * Failed many times, shutdown processing
- * @throws IOException
- */
- private void verifyAndAssignRootWithRetries() throws IOException {
- int iTimes = this.server.getConfiguration().getInt(
- "hbase.catalog.verification.retries", 10);
-
- long waitTime = this.server.getConfiguration().getLong(
- "hbase.catalog.verification.timeout", 1000);
-
- int iFlag = 0;
- while (true) {
- try {
- verifyAndAssignRoot();
- break;
- } catch (KeeperException e) {
- this.server.abort("In server shutdown processing, assigning root", e);
- throw new IOException("Aborting", e);
- } catch (Exception e) {
- if (iFlag >= iTimes) {
- this.server.abort("verifyAndAssignRoot failed after" + iTimes
- + " times retries, aborting", e);
- throw new IOException("Aborting", e);
- }
- try {
- Thread.sleep(waitTime);
- } catch (InterruptedException e1) {
- LOG.warn("Interrupted when is the thread sleep", e1);
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted", e1);
- }
- iFlag++;
- }
- }
- }
-
- /**
* @return True if the server we are processing was carrying <code>-ROOT-</code>
*/
boolean isCarryingRoot() {
@@ -182,30 +125,13 @@ public class ServerShutdownHandler exten
LOG.info("Skipping log splitting for " + serverName);
}
} catch (IOException ioe) {
- this.services.getExecutorService().submit(this);
+ //typecast to SSH so that we make sure that it is the SSH instance that
+ //gets submitted as opposed to MSSH or some other derived instance of SSH
+ this.services.getExecutorService().submit((ServerShutdownHandler)this);
this.deadServers.add(serverName);
throw new IOException("failed log splitting for " +
serverName + ", will retry", ioe);
}
-
- // Assign root and meta if we were carrying them.
- if (isCarryingRoot()) { // -ROOT-
- LOG.info("Server " + serverName +
- " was carrying ROOT. Trying to assign.");
- this.services.getAssignmentManager().
- regionOffline(HRegionInfo.ROOT_REGIONINFO);
- verifyAndAssignRootWithRetries();
- }
-
- // Carrying meta?
- if (isCarryingMeta()) {
- LOG.info("Server " + serverName +
- " was carrying META. Trying to assign.");
- this.services.getAssignmentManager().
- regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
- this.services.getAssignmentManager().assignMeta();
- }
-
// We don't want worker thread in the MetaServerShutdownHandler
// executor pool to block by waiting availability of -ROOT-
// and .META. server. Otherwise, it could run into the following issue:
@@ -430,7 +356,7 @@ public class ServerShutdownHandler exten
if (daughter == null) return 0;
if (isDaughterMissing(catalogTracker, daughter)) {
LOG.info("Fixup; missing daughter " + daughter.getRegionNameAsString());
- MetaEditor.addDaughter(catalogTracker, daughter, null);
+ MetaEditor.addDaughter(catalogTracker, daughter, null, HConstants.NO_SEQNUM);
// TODO: Log WARN if the regiondir does not exist in the fs. If its not
// there then something wonky about the split -- things will keep going
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java Wed Feb 13 20:58:23 2013
@@ -46,6 +46,7 @@ public class SplitRegionHandler extends
/**
* For testing only! Set to true to skip handling of split.
*/
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
public static boolean TEST_SKIP = false;
public SplitRegionHandler(Server server,
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java Wed Feb 13 20:58:23 2013
@@ -58,15 +58,13 @@ public class TableAddFamilyHandler exten
if(cpHost != null){
cpHost.preAddColumnHandler(this.tableName, this.familyDesc);
}
- // Update table descriptor in HDFS
- HTableDescriptor htd = this.masterServices.getMasterFileSystem()
- .addColumn(tableName, familyDesc);
- // Update in-memory descriptor cache
- this.masterServices.getTableDescriptors().add(htd);
+ // Update table descriptor
+ this.masterServices.getMasterFileSystem().addColumn(tableName, familyDesc);
if(cpHost != null){
cpHost.postAddColumnHandler(this.tableName, this.familyDesc);
}
}
+
@Override
public String toString() {
String name = "UnknownServerName";
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java Wed Feb 13 20:58:23 2013
@@ -53,11 +53,8 @@ public class TableDeleteFamilyHandler ex
if (cpHost != null) {
cpHost.preDeleteColumnHandler(this.tableName, this.familyName);
}
- // Update table descriptor in HDFS
- HTableDescriptor htd =
- this.masterServices.getMasterFileSystem().deleteColumn(tableName, familyName);
- // Update in-memory descriptor cache
- this.masterServices.getTableDescriptors().add(htd);
+ // Update table descriptor
+ this.masterServices.getMasterFileSystem().deleteColumn(tableName, familyName);
// Remove the column family from the file system
MasterFileSystem mfs = this.masterServices.getMasterFileSystem();
for (HRegionInfo hri : hris) {