You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/03/12 22:17:20 UTC
svn commit: r1576909 [2/18] - in /hbase/branches/0.89-fb/src: ./
examples/thrift/ main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/avro/
main/java/org/apache/hadoop/hbase/avro/generated/
main/java/org/apache/hadoop/hbase/client/ mai...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroServer.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroServer.java Wed Mar 12 21:17:13 2014
@@ -135,7 +135,7 @@ public class AvroServer {
// TODO(hammer): figure out appropriate setting of maxSize for htablePool
/**
* Constructs an HBaseImpl object.
- *
+ *
* @throws MasterNotRunningException
*/
HBaseImpl(Configuration conf) throws MasterNotRunningException {
@@ -251,7 +251,7 @@ public class AvroServer {
// Table admin
//
- public Void createTable(ATableDescriptor table) throws AIOError,
+ public Void createTable(ATableDescriptor table) throws AIOError,
AIllegalArgument,
ATableExists,
AMasterNotRunning {
@@ -313,7 +313,7 @@ public class AvroServer {
throw ioe;
}
}
-
+
public Void disableTable(ByteBuffer table) throws AIOError {
try {
admin.disableTable(Bytes.toBytes(table));
@@ -324,7 +324,7 @@ public class AvroServer {
throw ioe;
}
}
-
+
// NB: Asynchronous operation
public Void flush(ByteBuffer table) throws AIOError {
try {
@@ -355,7 +355,7 @@ public class AvroServer {
public Void addFamily(ByteBuffer table, AFamilyDescriptor family) throws AIOError {
try {
- admin.addColumn(Bytes.toBytes(table),
+ admin.addColumn(Bytes.toBytes(table),
AvroUtil.afdToHCD(family));
return null;
} catch (IOException e) {
@@ -404,7 +404,7 @@ public class AvroServer {
try {
return AvroUtil.resultToAResult(htable.get(AvroUtil.agetToGet(aget)));
} catch (IOException e) {
- AIOError ioe = new AIOError();
+ AIOError ioe = new AIOError();
ioe.message = new Utf8(e.getMessage());
throw ioe;
} finally {
@@ -417,7 +417,7 @@ public class AvroServer {
try {
return htable.exists(AvroUtil.agetToGet(aget));
} catch (IOException e) {
- AIOError ioe = new AIOError();
+ AIOError ioe = new AIOError();
ioe.message = new Utf8(e.getMessage());
throw ioe;
} finally {
@@ -445,7 +445,7 @@ public class AvroServer {
htable.delete(AvroUtil.adeleteToDelete(adelete));
return null;
} catch (IOException e) {
- AIOError ioe = new AIOError();
+ AIOError ioe = new AIOError();
ioe.message = new Utf8(e.getMessage());
throw ioe;
} finally {
@@ -476,7 +476,7 @@ public class AvroServer {
Scan scan = AvroUtil.ascanToScan(ascan);
return addScanner(htable.getScanner(scan));
} catch (IOException e) {
- AIOError ioe = new AIOError();
+ AIOError ioe = new AIOError();
ioe.message = new Utf8(e.getMessage());
throw ioe;
} finally {
@@ -488,7 +488,7 @@ public class AvroServer {
try {
ResultScanner scanner = getScanner(scannerId);
if (scanner == null) {
- AIllegalArgument aie = new AIllegalArgument();
+ AIllegalArgument aie = new AIllegalArgument();
aie.message = new Utf8("scanner ID is invalid: " + scannerId);
throw aie;
}
@@ -496,7 +496,7 @@ public class AvroServer {
removeScanner(scannerId);
return null;
} catch (IOException e) {
- AIOError ioe = new AIOError();
+ AIOError ioe = new AIOError();
ioe.message = new Utf8(e.getMessage());
throw ioe;
}
@@ -506,14 +506,14 @@ public class AvroServer {
try {
ResultScanner scanner = getScanner(scannerId);
if (scanner == null) {
- AIllegalArgument aie = new AIllegalArgument();
+ AIllegalArgument aie = new AIllegalArgument();
aie.message = new Utf8("scanner ID is invalid: " + scannerId);
throw aie;
}
Result[] results = null;
return AvroUtil.resultsToAResults(scanner.next(numberOfRows));
} catch (IOException e) {
- AIOError ioe = new AIOError();
+ AIOError ioe = new AIOError();
ioe.message = new Utf8(e.getMessage());
throw ioe;
}
@@ -527,7 +527,7 @@ public class AvroServer {
private static void printUsageAndExit() {
printUsageAndExit(null);
}
-
+
private static void printUsageAndExit(final String message) {
if (message != null) {
System.err.println(message);
@@ -563,7 +563,7 @@ public class AvroServer {
"bin/hbase-daemon.sh stop avro or send a kill signal to " +
"the Avro server pid");
}
-
+
// Print out usage if we get to here.
printUsageAndExit();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroUtil.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroUtil.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/AvroUtil.java Wed Mar 12 21:17:13 2014
@@ -18,11 +18,10 @@
package org.apache.hadoop.hbase.avro;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -31,13 +30,6 @@ import org.apache.hadoop.hbase.HServerIn
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.avro.generated.AClusterStatus;
import org.apache.hadoop.hbase.avro.generated.AColumn;
import org.apache.hadoop.hbase.avro.generated.AColumnValue;
@@ -55,11 +47,18 @@ import org.apache.hadoop.hbase.avro.gene
import org.apache.hadoop.hbase.avro.generated.AServerInfo;
import org.apache.hadoop.hbase.avro.generated.AServerLoad;
import org.apache.hadoop.hbase.avro.generated.ATableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.util.Utf8;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
public class AvroUtil {
@@ -77,7 +76,7 @@ public class AvroUtil {
static public ARegionLoad hrlToARL(HServerLoad.RegionLoad rl) throws IOException {
ARegionLoad arl = new ARegionLoad();
- arl.memStoreSizeMB = rl.getMemStoreSizeMB();
+ arl.memStoreSizeMB = rl.getMemstoreSizeMB();
arl.name = ByteBuffer.wrap(rl.getName());
arl.storefileIndexSizeMB = rl.getStorefileIndexSizeMB();
arl.storefiles = rl.getStorefiles();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/generated/ACompressionAlgorithm.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/generated/ACompressionAlgorithm.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/generated/ACompressionAlgorithm.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/generated/ACompressionAlgorithm.java Wed Mar 12 21:17:13 2014
@@ -1,6 +1,6 @@
package org.apache.hadoop.hbase.avro.generated;
@SuppressWarnings("all")
-public enum ACompressionAlgorithm {
+public enum ACompressionAlgorithm {
LZO, GZ, NONE
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/package.html
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/package.html?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/package.html (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/avro/package.html Wed Mar 12 21:17:13 2014
@@ -25,7 +25,7 @@ Provides an HBase <a href="http://avro.a
This directory contains an Avro interface definition file for an HBase RPC
service and a Java server implementation.
-<h2><a name="whatisavro">What is Avro?</a></h2>
+<h2><a name="whatisavro">What is Avro?</a></h2>
<p>Avro is a data serialization and RPC system. For more, see the
<a href="http://avro.apache.org/docs/current/spec.html">current specification</a>.
@@ -48,9 +48,9 @@ types, and RPC utility files are checked
</pre>
</p>
-<p>The 'avro-tools-x.y.z.jar' jarfile is an Avro utility, and it is
-distributed as a part of the Avro package. Additionally, specific
-language runtime libraries are apart of the Avro package. A version of the
+<p>The 'avro-tools-x.y.z.jar' jarfile is an Avro utility, and it is
+distributed as a part of the Avro package. Additionally, specific
+language runtime libraries are apart of the Avro package. A version of the
Java runtime is listed as a dendency in Maven.
</p>
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java Wed Mar 12 21:17:13 2014
@@ -20,15 +20,6 @@
package org.apache.hadoop.hbase.client;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -49,6 +40,15 @@ import org.apache.hadoop.hbase.util.Daem
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
* ResultScanner used to read KeyValue pairs from similar to the ClientScanner
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientZKConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientZKConnection.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientZKConnection.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientZKConnection.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,135 @@
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+/**
+ * This class is responsible to handle connection and reconnection to a
+ * zookeeper quorum.
+ *
+ */
+public class ClientZKConnection implements Abortable, Watcher {
+
+ private static String ZK_INSTANCE_NAME = HConnectionManager.class.getSimpleName();
+ static final Log LOG = LogFactory.getLog(ClientZKConnection.class);
+ private ZooKeeperWrapper zooKeeperWrapper;
+ private Configuration conf;
+ private boolean aborted = false;
+ private int reconnectionTimes = 0;
+ private int maxReconnectionTimes = 0;
+
+ /**
+ * Create a ClientZKConnection
+ *
+ * @param conf configuration
+ */
+ public ClientZKConnection(Configuration conf) {
+ this.conf = conf;
+ maxReconnectionTimes = conf.getInt(
+ "hbase.client.max.zookeeper.reconnection", 3);
+ }
+
+ /**
+ * Get the zookeeper wrapper for this connection, instantiate it if necessary.
+ *
+ * @return zooKeeperWrapper
+ * @throws java.io.IOException if a remote or network exception occurs
+ */
+ public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
+ if (zooKeeperWrapper == null) {
+ if (this.reconnectionTimes < this.maxReconnectionTimes) {
+ zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf,
+ ZK_INSTANCE_NAME, this);
+ } else {
+ String msg = "HBase client failed to connection to zk after "
+ + maxReconnectionTimes + " attempts";
+ LOG.fatal(msg);
+ throw new IOException(msg);
+ }
+ }
+ return zooKeeperWrapper;
+ }
+
+ /**
+ * Close this connection to zookeeper.
+ */
+ synchronized void closeZooKeeperConnection() {
+ if (zooKeeperWrapper != null) {
+ zooKeeperWrapper.close();
+ zooKeeperWrapper = null;
+ }
+ }
+
+ /**
+ * Reset this connection to zookeeper.
+ *
+ * @throws IOException If there is any exception when reconnect to zookeeper
+ */
+ private synchronized void resetZooKeeperConnection() throws IOException {
+ // close the zookeeper connection first
+ closeZooKeeperConnection();
+ // reconnect to zookeeper
+ zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, ZK_INSTANCE_NAME,
+ this);
+ }
+
+ @Override
+ public synchronized void process(WatchedEvent event) {
+ LOG.debug("Received ZK WatchedEvent: " + "[path=" + event.getPath() + "] "
+ + "[state=" + event.getState().toString() + "] " + "[type="
+ + event.getType().toString() + "]");
+ if (event.getType() == EventType.None
+ && event.getState() == KeeperState.SyncConnected) {
+ LOG.info("Reconnected to ZooKeeper");
+ // reset the reconnection times
+ reconnectionTimes = 0;
+ }
+ }
+
+ @Override
+ public void abort(final String msg, Throwable t) {
+ if (t != null && t instanceof KeeperException.SessionExpiredException) {
+ try {
+ reconnectionTimes++;
+ LOG.info("This client just lost it's session with ZooKeeper, "
+ + "trying the " + reconnectionTimes + " times to reconnect.");
+ // reconnect to zookeeper if possible
+ resetZooKeeperConnection();
+
+ LOG.info("Reconnected successfully. This disconnect could have been"
+ + " caused by a network partition or a long-running GC pause,"
+ + " either way it's recommended that you verify your "
+ + "environment.");
+ this.aborted = false;
+ return;
+ } catch (IOException e) {
+ LOG.error("Could not reconnect to ZooKeeper after session"
+ + " expiration, aborting");
+ t = e;
+ }
+ }
+ if (t != null)
+ LOG.fatal(msg, t);
+ else
+ LOG.fatal(msg);
+
+ this.aborted = true;
+ closeZooKeeperConnection();
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java Wed Mar 12 21:17:13 2014
@@ -24,6 +24,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
/**
@@ -66,16 +71,18 @@ import com.google.common.base.Preconditi
* deleteFamily -- then you need to use the method overrides that take a
* timestamp. The constructor timestamp is not referenced.
*/
+@ThriftStruct
public class Delete extends Mutation
implements Writable, Comparable<Row> {
private static final byte DELETE_VERSION = (byte)3;
private static final int ADDED_WRITE_TO_WAL_VERSION = 3;
private static final int ADDED_ATTRIBUTES_VERSION = 2;
+ private static final Delete dummyDelete = new Delete();
/** Constructor for Writable. DO NOT USE */
public Delete() {
- this((byte [])null);
+ this((byte[])null);
}
/**
@@ -86,7 +93,7 @@ public class Delete extends Mutation
* families).
* @param row row key
*/
- public Delete(byte [] row) {
+ public Delete(byte[] row) {
this(row, HConstants.LATEST_TIMESTAMP, null);
}
@@ -104,7 +111,7 @@ public class Delete extends Mutation
* @param timestamp maximum version timestamp (only for delete row)
* @param rowLock previously acquired row lock, or null
*/
- public Delete(byte [] row, long timestamp, RowLock rowLock) {
+ public Delete(byte[] row, long timestamp, RowLock rowLock) {
this.row = row;
this.ts = timestamp;
if (rowLock != null) {
@@ -131,11 +138,73 @@ public class Delete extends Mutation
* @param family family name
* @return this for invocation chaining
*/
- public Delete deleteFamily(byte [] family) {
+ public Delete deleteFamily(byte[] family) {
this.deleteFamily(family, HConstants.LATEST_TIMESTAMP);
return this;
}
+ @ThriftConstructor
+ public Delete(@ThriftField(1) final byte[] row,
+ @ThriftField(2) final long timeStamp,
+ @ThriftField(3) final Map<byte[], List<KeyValue>> familyMapSerial,
+ @ThriftField(4) final long lockId,
+ @ThriftField(5) final boolean writeToWAL) {
+ this(row, timeStamp, null);
+ this.lockId = lockId;
+ this.writeToWAL = writeToWAL;
+ // Need to do this, since familyMapSerial might be a HashMap whereas, we
+ // need a TreeMap that uses the Bytes.BYTES_COMPARATOR, as specified in
+ // the Mutation class.
+ this.familyMap.putAll(familyMapSerial);
+ }
+
+ @Override
+ @ThriftField(1)
+ public byte[] getRow() {
+ return this.row;
+ }
+
+ @Override
+ @ThriftField(2)
+ public long getTimeStamp() {
+ return this.ts;
+ }
+
+ /**
+ * Method for retrieving the delete's familyMap
+ * @return familyMap
+ */
+ @Override
+ @ThriftField(3)
+ public Map<byte[], List<KeyValue>> getFamilyMap() {
+ return this.familyMap;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @ThriftField(4)
+ public long getLockId() {
+ return this.lockId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @ThriftField(5)
+ public boolean getWriteToWAL() {
+ return this.writeToWAL;
+ }
+
+ /**
+ * Static factory constructor which can create a dummy put.
+ */
+ static Delete createDummyDelete() {
+ return Delete.dummyDelete;
+ }
+
/**
* Delete all columns of the specified family with a timestamp less than
* or equal to the specified timestamp.
@@ -146,7 +215,7 @@ public class Delete extends Mutation
* @param timestamp maximum version timestamp
* @return this for invocation chaining
*/
- public Delete deleteFamily(byte [] family, long timestamp) {
+ public Delete deleteFamily(byte[] family, long timestamp) {
List<KeyValue> list = familyMap.get(family);
if(list == null) {
list = new ArrayList<KeyValue>();
@@ -158,13 +227,17 @@ public class Delete extends Mutation
return this;
}
+ public boolean isDummy() {
+ return this.row == null || this.row.length == 0;
+ }
+
/**
* Delete all versions of the specified column.
* @param family family name
* @param qualifier column qualifier
* @return this for invocation chaining
*/
- public Delete deleteColumns(byte [] family, byte [] qualifier) {
+ public Delete deleteColumns(byte[] family, byte[] qualifier) {
this.deleteColumns(family, qualifier, HConstants.LATEST_TIMESTAMP);
return this;
}
@@ -177,7 +250,7 @@ public class Delete extends Mutation
* @param timestamp maximum version timestamp
* @return this for invocation chaining
*/
- public Delete deleteColumns(byte [] family, byte [] qualifier, long timestamp) {
+ public Delete deleteColumns(byte[] family, byte[] qualifier, long timestamp) {
List<KeyValue> list = familyMap.get(family);
if (list == null) {
list = new ArrayList<KeyValue>();
@@ -197,7 +270,7 @@ public class Delete extends Mutation
* @param qualifier column qualifier
* @return this for invocation chaining
*/
- public Delete deleteColumn(byte [] family, byte [] qualifier) {
+ public Delete deleteColumn(byte[] family, byte[] qualifier) {
this.deleteColumn(family, qualifier, HConstants.LATEST_TIMESTAMP);
return this;
}
@@ -209,7 +282,7 @@ public class Delete extends Mutation
* @param timestamp version timestamp
* @return this for invocation chaining
*/
- public Delete deleteColumn(byte [] family, byte [] qualifier, long timestamp) {
+ public Delete deleteColumn(byte[] family, byte[] qualifier, long timestamp) {
List<KeyValue> list = familyMap.get(family);
if(list == null) {
list = new ArrayList<KeyValue>();
@@ -220,15 +293,8 @@ public class Delete extends Mutation
return this;
}
- /**
- * Method for retrieving the delete's familyMap
- * @return familyMap
- */
- public Map<byte [], List<KeyValue>> getFamilyMap() {
- return this.familyMap;
- }
-
//Writable
+ @Override
public void readFields(final DataInput in) throws IOException {
int version = in.readByte();
if (version > DELETE_VERSION) {
@@ -243,7 +309,7 @@ public class Delete extends Mutation
this.familyMap.clear();
int numFamilies = in.readInt();
for(int i=0;i<numFamilies;i++) {
- byte [] family = Bytes.readByteArray(in);
+ byte[] family = Bytes.readByteArray(in);
int numColumns = in.readInt();
List<KeyValue> list = new ArrayList<KeyValue>(numColumns);
for(int j=0;j<numColumns;j++) {
@@ -258,6 +324,7 @@ public class Delete extends Mutation
}
}
+ @Override
public void write(final DataOutput out) throws IOException {
int version = 1;
if (!getAttributesMap().isEmpty()) {
@@ -275,7 +342,7 @@ public class Delete extends Mutation
out.writeBoolean(writeToWAL);
}
out.writeInt(familyMap.size());
- for(Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
+ for(Map.Entry<byte[], List<KeyValue>> entry : familyMap.entrySet()) {
Bytes.writeByteArray(out, entry.getKey());
List<KeyValue> list = entry.getValue();
out.writeInt(list.size());
@@ -297,8 +364,9 @@ public class Delete extends Mutation
* @deprecated use {@link #deleteColumn(byte[], byte[], long)} instead
* @return this for invocation chaining
*/
- public Delete deleteColumns(byte [] column, long timestamp) {
- byte [][] parts = KeyValue.parseColumn(column);
+ @Deprecated
+ public Delete deleteColumns(byte[] column, long timestamp) {
+ byte[][] parts = KeyValue.parseColumn(column);
this.deleteColumns(parts[0], parts[1], timestamp);
return this;
}
@@ -310,8 +378,9 @@ public class Delete extends Mutation
* @deprecated use {@link #deleteColumn(byte[], byte[])} instead
* @return this for invocation chaining
*/
- public Delete deleteColumn(byte [] column) {
- byte [][] parts = KeyValue.parseColumn(column);
+ @Deprecated
+ public Delete deleteColumn(byte[] column) {
+ byte[][] parts = KeyValue.parseColumn(column);
this.deleteColumn(parts[0], parts[1], HConstants.LATEST_TIMESTAMP);
return this;
}
@@ -337,9 +406,87 @@ public class Delete extends Mutation
* @param timestamp will delete data at this timestamp or older
*/
public void deleteRow(long timestamp) {
- Preconditions.checkArgument(familyMap.isEmpty(),
- "Cannot delete entire row, column families already specified");
+ Preconditions.checkArgument(familyMap.isEmpty(),
+ "Cannot delete entire row, column families already specified");
ts = timestamp;
}
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(row, ts, lockId, familyMap);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Delete other = (Delete) obj;
+ if (ts != other.getTimeStamp()) {
+ return false;
+ }
+ if (writeToWAL != other.getWriteToWAL()) {
+ return false;
+ }
+ if (lockId != other.getLockId()) {
+ return false;
+ }
+ if (!Arrays.equals(other.row, this.getRow())) {
+ return false;
+ }
+ if (familyMap == null && other.getFamilyMap() != null) {
+ return false;
+ } else if (familyMap != null && other.getFamilyMap() == null) {
+ return false;
+ }
+ return familyMap.size() == other.getFamilyMap().size() &&
+ familyMap.entrySet().containsAll(other.getFamilyMap().entrySet());
+ }
+
+ public static class Builder {
+ private byte[] row;
+ private long timeStamp;
+ private Map<byte[], List<KeyValue>> familyMap;
+ private long lockId;
+ private boolean writeToWAL;
+
+ public Builder() {
+ }
+
+ public Builder setRow(byte[] row) {
+ this.row = row;
+ return this;
+ }
+
+ public Builder setTimeStamp(long ts) {
+ this.timeStamp = ts;
+ return this;
+ }
+
+ public Builder setFamilyMap(Map<byte[], List<KeyValue>> familyMap) {
+ this.familyMap = familyMap;
+ return this;
+ }
+
+ public Builder setLockId(long lockId) {
+ this.lockId = lockId;
+ return this;
+ }
+
+ public Builder setWriteToWAL(boolean writeToWAL) {
+ this.writeToWAL = writeToWAL;
+ return this;
+ }
+
+ public Delete create() {
+ return new Delete(row, timeStamp, familyMap, lockId, writeToWAL);
+ }
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java Wed Mar 12 21:17:13 2014
@@ -19,27 +19,38 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.TFilter;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.base.Objects;
+
/**
* Used to perform Get operations on a single row.
* <p>
@@ -67,22 +78,26 @@ import java.util.TreeSet;
* <p>
* To add a filter, execute {@link #setFilter(Filter) setFilter}.
*/
+
+@ThriftStruct
public class Get extends OperationWithAttributes
implements Writable, Row, Comparable<Row> {
+ private static final Log LOG = LogFactory.getLog(Get.class);
private static final byte STORE_LIMIT_VERSION = (byte) 2;
private static final byte STORE_OFFSET_VERSION = (byte) 3;
private static final byte FLASHBACK_VERSION = (byte) 4;
private static final byte GET_VERSION = FLASHBACK_VERSION;
- private byte [] row = null;
+ private byte[] row = null;
private long lockId = -1L;
private int maxVersions = 1;
private int storeLimit = -1;
private int storeOffset = 0;
private Filter filter = null;
+ private TFilter tFilter = null;
private TimeRange tr = new TimeRange();
- private Map<byte [], NavigableSet<byte []>> familyMap =
- new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+ private Map<byte[], NavigableSet<byte[]>> familyMap =
+ new TreeMap<byte[], NavigableSet<byte[]>>(Bytes.BYTES_COMPARATOR);
// Operation should be performed as if it was performed at the given ts.
private long effectiveTS = HConstants.LATEST_TIMESTAMP;
@@ -96,7 +111,7 @@ public class Get extends OperationWithAt
* all columns in all families of the specified row.
* @param row row key
*/
- public Get(byte [] row) {
+ public Get(byte[] row) {
this(row, null);
}
@@ -108,7 +123,7 @@ public class Get extends OperationWithAt
* @param row row key
* @param rowLock previously acquired row lock, or null
*/
- public Get(byte [] row, RowLock rowLock) {
+ public Get(byte[] row, RowLock rowLock) {
this.row = row;
if(rowLock != null) {
this.lockId = rowLock.getLockId();
@@ -116,15 +131,58 @@ public class Get extends OperationWithAt
}
/**
- * Get all columns from the specified family.
- * <p>
+ * Thrift Constructor
+ * TODO: add Filter annotations!
+ * @param row
+ * @param lockId
+ * @param maxVersions
+ * @param storeLimit
+ * @param storeOffset
+ * @param tr
+ * @param familyMap
+ * @param effectiveTS
+ */
+ @ThriftConstructor
+ public Get(
+ @ThriftField(1) byte[] row,
+ @ThriftField(2) long lockId,
+ @ThriftField(3) int maxVersions,
+ @ThriftField(4) int storeLimit,
+ @ThriftField(5) int storeOffset,
+ @ThriftField(6) TimeRange tr,
+ @ThriftField(7) Map<byte[], Set<byte[]>> familyMap,
+ @ThriftField(8) long effectiveTS,
+ @ThriftField(9) TFilter tFilter){
+ this.row = row;
+ this.lockId = lockId;
+ this.maxVersions = maxVersions;
+ this.storeLimit = storeLimit;
+ this.storeOffset = storeOffset;
+ this.effectiveTS = effectiveTS;
+ this.tr = tr;
+ this.tFilter = tFilter;
+ this.filter = tFilter;
+ if (familyMap != null) {
+ for (Entry<byte[], Set<byte[]>> entry : familyMap.entrySet()) {
+ NavigableSet<byte[]> set = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ if (entry.getValue() != null) {
+ set.addAll(entry.getValue());
+ }
+ this.familyMap.put(entry.getKey(), set);
+ }
+ }
+ }
+
+
+ /**
* Overrides previous calls to addColumn for this family.
* @param family family name
* @return the Get object
+ * @deprecated use {@link Builder#addFamily(byte[])} instead.
*/
- public Get addFamily(byte [] family) {
- familyMap.remove(family);
- familyMap.put(family, null);
+ @Deprecated
+ public Get addFamily(byte[] family) {
+ familyMap.put(family, new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
return this;
}
@@ -135,11 +193,13 @@ public class Get extends OperationWithAt
* @param family family name
* @param qualifier column qualifier
* @return the Get objec
+ * @deprecated use {@link Builder#addColumn(byte[], byte[])} instead.
*/
- public Get addColumn(byte [] family, byte [] qualifier) {
- NavigableSet<byte []> set = familyMap.get(family);
+ @Deprecated
+ public Get addColumn(byte[] family, byte[] qualifier) {
+ NavigableSet<byte[]> set = familyMap.get(family);
if (set == null) {
- set = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ set = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
}
if (qualifier == null) {
set.add(HConstants.EMPTY_BYTE_ARRAY);
@@ -239,7 +299,14 @@ public class Get extends OperationWithAt
* @return this for invocation chaining
*/
public Get setFilter(Filter filter) {
- this.filter = filter;
+ try {
+ this.tFilter = TFilter.getTFilter(filter);
+ } catch (IOException e) {
+ LOG.error("Caught IOException in serializing filter." +
+ " Cannot continue.");
+ throw new RuntimeException(e);
+ }
+ this.filter = tFilter;
return this;
}
@@ -248,15 +315,27 @@ public class Get extends OperationWithAt
/**
* @return Filter
*/
+ @ThriftField(9)
+ public TFilter getTFilter() {
+ return this.tFilter;
+ }
+
public Filter getFilter() {
- return this.filter;
+ try {
+ if (tFilter == null) return null;
+ return this.tFilter.getFilter();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
* Method for retrieving the get's row
* @return row
*/
- public byte [] getRow() {
+ @Override
+ @ThriftField(1)
+ public byte[] getRow() {
return this.row;
}
@@ -272,6 +351,7 @@ public class Get extends OperationWithAt
* Method for retrieving the get's lockId
* @return lockId
*/
+ @ThriftField(2)
public long getLockId() {
return this.lockId;
}
@@ -280,10 +360,21 @@ public class Get extends OperationWithAt
* Method for retrieving the get's maximum number of version
* @return the maximum number of version to fetch for this get
*/
+ @ThriftField(3)
public int getMaxVersions() {
return this.maxVersions;
}
+ @ThriftField(4)
+ public int getStoreLimit() {
+ return this.storeLimit;
+ }
+
+ @ThriftField(5)
+ public int getStoreOffset() {
+ return this.storeOffset;
+ }
+
/**
* @return the effective timestamp of this operation.
*/
@@ -313,6 +404,7 @@ public class Get extends OperationWithAt
* Method for retrieving the get's TimeRange
* @return timeRange
*/
+ @ThriftField(6)
public TimeRange getTimeRange() {
return this.tr;
}
@@ -345,8 +437,27 @@ public class Get extends OperationWithAt
* Method for retrieving the get's familyMap
* @return familyMap
*/
- public Map<byte[],NavigableSet<byte[]>> getFamilyMap() {
- return this.familyMap;
+ public Map<byte[], NavigableSet<byte[]>> getFamilyMap() {
+ return familyMap;
+ }
+
+
+ @ThriftField(7)
+ public Map<byte[], Set<byte[]>> getSerializableFamilyMap() {
+ Map<byte[], Set<byte[]>> serializableMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ Set<byte[]> serilizableSet = new HashSet<byte[]>();
+ for (byte[] setEntry : entry.getValue()) {
+ serilizableSet.add(setEntry);
+ }
+ serializableMap.put(entry.getKey(), serilizableSet);
+ }
+ return serializableMap;
+ }
+
+ @ThriftField(8)
+ public long geEffectiveTS() {
+ return this.effectiveTS;
}
/**
@@ -358,9 +469,9 @@ public class Get extends OperationWithAt
@Override
public Map<String, Object> getFingerprint() {
Map<String, Object> map = new HashMap<String, Object>();
- List<String> families = new ArrayList<String>();
+ List<String> families = new ArrayList<String>(this.familyMap.size());
map.put("families", families);
- for (Map.Entry<byte [], NavigableSet<byte[]>> entry :
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
this.familyMap.entrySet()) {
families.add(Bytes.toStringBinary(entry.getKey()));
}
@@ -386,13 +497,13 @@ public class Get extends OperationWithAt
map.put("row", Bytes.toStringBinary(this.row));
map.put("maxVersions", this.maxVersions);
map.put("storeLimit", this.storeLimit);
- List<Long> timeRange = new ArrayList<Long>();
+ List<Long> timeRange = new ArrayList<Long>(2);
timeRange.add(this.tr.getMin());
timeRange.add(this.tr.getMax());
map.put("timeRange", timeRange);
int colCount = 0;
// iterate through affected families and add details
- for (Map.Entry<byte [], NavigableSet<byte[]>> entry :
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
this.familyMap.entrySet()) {
List<String> familyList = new ArrayList<String>();
columns.put(Bytes.toStringBinary(entry.getKey()), familyList);
@@ -405,7 +516,7 @@ public class Get extends OperationWithAt
if (maxCols <= 0) {
continue;
}
- for (byte [] column : entry.getValue()) {
+ for (byte[] column : entry.getValue()) {
if (--maxCols <= 0) {
continue;
}
@@ -420,11 +531,13 @@ public class Get extends OperationWithAt
return map;
}
+ @Override
public int compareTo(Row p) {
return Bytes.compareTo(this.getRow(), p.getRow());
}
//Writable
+ @Override
public void readFields(final DataInput in)
throws IOException {
int version = in.readByte();
@@ -452,16 +565,16 @@ public class Get extends OperationWithAt
tr.readFields(in);
int numFamilies = in.readInt();
this.familyMap =
- new TreeMap<byte [],NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+ new TreeMap<byte[],NavigableSet<byte[]>>(Bytes.BYTES_COMPARATOR);
for(int i=0; i<numFamilies; i++) {
- byte [] family = Bytes.readByteArray(in);
+ byte[] family = Bytes.readByteArray(in);
boolean hasColumns = in.readBoolean();
- NavigableSet<byte []> set = null;
+ NavigableSet<byte[]> set = null;
if(hasColumns) {
int numColumns = in.readInt();
- set = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ set = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for(int j=0; j<numColumns; j++) {
- byte [] qualifier = Bytes.readByteArray(in);
+ byte[] qualifier = Bytes.readByteArray(in);
set.add(qualifier);
}
}
@@ -469,6 +582,7 @@ public class Get extends OperationWithAt
}
}
+ @Override
public void write(final DataOutput out)
throws IOException {
// We try to talk a protocol version as low as possible so that we can be
@@ -503,16 +617,16 @@ public class Get extends OperationWithAt
}
tr.write(out);
out.writeInt(familyMap.size());
- for(Map.Entry<byte [], NavigableSet<byte []>> entry :
+ for(Map.Entry<byte[], NavigableSet<byte[]>> entry :
familyMap.entrySet()) {
Bytes.writeByteArray(out, entry.getKey());
- NavigableSet<byte []> columnSet = entry.getValue();
+ NavigableSet<byte[]> columnSet = entry.getValue();
if(columnSet == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeInt(columnSet.size());
- for(byte [] qualifier : columnSet) {
+ for(byte[] qualifier : columnSet) {
Bytes.writeByteArray(out, qualifier);
}
}
@@ -538,8 +652,8 @@ public class Get extends OperationWithAt
* @deprecated issue multiple {@link #addColumn(byte[], byte[])} instead
* @return this for invocation chaining
*/
- @SuppressWarnings({"deprecation"})
- public Get addColumns(byte [][] columns) {
+ @Deprecated
+ public Get addColumns(byte[][] columns) {
if (columns == null) return this;
for (byte[] column : columns) {
try {
@@ -556,9 +670,10 @@ public class Get extends OperationWithAt
* @return This.
* @deprecated use {@link #addColumn(byte[], byte[])} instead
*/
- public Get addColumn(final byte [] column) {
+ @Deprecated
+ public Get addColumn(final byte[] column) {
if (column == null) return this;
- byte [][] split = KeyValue.parseColumn(column);
+ byte[][] split = KeyValue.parseColumn(column);
if (split.length > 1 && split[1] != null && split[1].length > 0) {
addColumn(split[0], split[1]);
} else {
@@ -566,4 +681,168 @@ public class Get extends OperationWithAt
}
return this;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(row, lockId, maxVersions, storeLimit, storeOffset,
+ tr, familyMap, effectiveTS);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Get other = (Get) obj;
+ if (effectiveTS != other.effectiveTS)
+ return false;
+ if (familyMap == null) {
+ if (other.familyMap != null)
+ return false;
+ } else if (!familyMap.equals(other.familyMap))
+ return false;
+ if (filter == null) {
+ if (other.filter != null)
+ return false;
+ } else if (!filter.equals(other.filter))
+ return false;
+ if (lockId != other.lockId)
+ return false;
+ if (maxVersions != other.maxVersions)
+ return false;
+ if (!Arrays.equals(row, other.row))
+ return false;
+ if (storeLimit != other.storeLimit)
+ return false;
+ if (storeOffset != other.storeOffset)
+ return false;
+ if (tr == null) {
+ if (other.tr != null)
+ return false;
+ } else if (!tr.equals(other.tr))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "Get [row=" + Arrays.toString(row) + ", lockId=" + lockId
+ + ", maxVersions=" + maxVersions + ", storeLimit=" + storeLimit
+ + ", storeOffset=" + storeOffset + ", filter=" + filter + ", tr=" + tr
+ + ", familyMap=" + familyMap + ", effectiveTS=" + effectiveTS + "]";
+ }
+
+ /**
+ * Builder class for Get
+ */
+ public static class Builder {
+ private byte[] row;
+ private long lockId;
+ private int maxVersions;
+ private int storeLimit;
+ private int storeOffset;
+ private TimeRange tr;
+ private Map<byte[], Set<byte[]>> familyMap;
+ private long effectiveTS;
+ private TFilter tFilter;
+
+ public Builder(byte[] row) {
+ this.row = row;
+ this.lockId = -1L;
+ this.maxVersions = 1;
+ this.storeLimit = -1;
+ this.storeOffset = 0;
+ this.tr = new TimeRange();
+ this.effectiveTS = HConstants.LATEST_TIMESTAMP;
+ }
+
+ public Builder setLockId(long lockId) {
+ this.lockId = lockId;
+ return this;
+ }
+
+ public Builder setMaxVersions(int maxVersions) {
+ this.maxVersions = maxVersions;
+ return this;
+ }
+
+ public Builder setMaxVersions() {
+ this.maxVersions = Integer.MAX_VALUE;
+ return this;
+ }
+
+ public Builder setStoreLimit(int storeLimit) {
+ this.storeLimit = storeLimit;
+ return this;
+ }
+
+ public Builder setStoreOffset(int storeOffset) {
+ this.storeOffset = storeOffset;
+ return this;
+ }
+
+ public Builder setTr(TimeRange tr) {
+ this.tr = tr;
+ return this;
+ }
+
+ public Builder setFamilyMap(Map<byte[], Set<byte[]>> familyMap) {
+ this.familyMap = familyMap;
+ return this;
+ }
+
+ /**
+ * Overrides previous calls to addColumn for this family.
+ * @param family family name
+ * @return the Build object
+ */
+ public Builder addFamily(byte[] family) {
+ if (this.familyMap == null) {
+ this.familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ }
+ this.familyMap.put(family, new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
+ return this;
+ }
+
+ /**
+ * Overrides previous calls to addFamily for this family.
+ * @param family family name
+ * @param qualifier column qualifier
+ * @return the Build object
+ */
+ public Builder addColumn(byte[] family, byte[] qualifier) {
+ if (this.familyMap == null) {
+ this.familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ }
+ Set<byte[]> set = this.familyMap.get(family);
+ if (set == null) {
+ set = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ }
+ if (qualifier == null) {
+ set.add(HConstants.EMPTY_BYTE_ARRAY);
+ } else {
+ set.add(qualifier);
+ }
+ this.familyMap.put(family, set);
+ return this;
+ }
+
+ public Builder setEffectiveTS(long effectiveTS) {
+ this.effectiveTS = effectiveTS;
+ return this;
+ }
+
+ public Builder setFilter(Filter f) throws IOException {
+ this.tFilter = TFilter.getTFilter(f);
+ return this;
+ }
+
+ public Get create() {
+ return new Get(this.row, this.lockId, this.maxVersions, this.storeLimit,
+ this.storeOffset, this.tr, this.familyMap, this.effectiveTS, this.tFilter);
+ }
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Wed Mar 12 21:17:13 2014
@@ -450,7 +450,7 @@ public class HBaseAdmin {
scannerId = server.openScanner(
firstMetaServer.getRegionInfo().getRegionName(), scan);
// Get a batch at a time.
- Result [] values = server.next(scannerId, batchCount);
+ Result[] values = server.next(scannerId, batchCount);
if (values == null || values.length == 0) {
break;
}
@@ -1096,7 +1096,8 @@ public class HBaseAdmin {
compactCF(tableOrRegionName, columnFamily, HConstants.Modify.TABLE_COMPACT);
return;
}
- byte [] tableName = HRegionInfo.parseRegionName(tableOrRegionName)[0];
+ @SuppressWarnings("unused")
+ byte[] tableName = HRegionInfo.parseRegionName(tableOrRegionName)[0];
// Perform compaction only if a valid column family was passed.
modifyTable(null, HConstants.Modify.TABLE_COMPACT,
new Object[] {tableOrRegionName, columnFamily});
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java Wed Mar 12 21:17:13 2014
@@ -19,8 +19,6 @@
*/
package org.apache.hadoop.hbase.client;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -61,6 +59,7 @@ import org.apache.hadoop.hbase.HServerIn
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -72,15 +71,15 @@ import org.apache.hadoop.util.StringUtil
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.json.JSONArray;
-import org.json.JSONObject;
import org.json.JSONException;
+import org.json.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
- * Check consistency among the in-memory states of the master and the
+ * Check consistency among the in-memory states of the master and the
* region server(s) and the state of data in HDFS.
*/
public class HBaseFsck {
@@ -104,11 +103,20 @@ public class HBaseFsck {
FixState fix = FixState.NONE; // do we want to try fixing the errors?
private boolean rerun = false; // if we tried to fix something rerun hbck
private static boolean summary = false; // if we want to print less output
- private static boolean checkRegionInfo = false;
+ private static boolean checkRegionInfo = false;
private static boolean promptResponse = false; // "no" to all prompt questions
private int numThreads = MAX_NUM_THREADS;
private final String UnkownTable = "__UnknownTable__";
+ // When HMaster is processing dead server shutdown (e.g. log split), hbck detects
+ // the region as unassigned and tries to fix it if fix option is on. But this process
+ // cleans assignment in META and triggers a new assignment immediately. The region
+ // could be assigned to a healthy node regardless of whether log split has completed.
+ // Some unflushed HLog records could get stuck forever. That means the cluster "loses"
+ // these data. This flag is to prevent hbck from clearing assignment in META to dead
+ // region server.
+ private static boolean forceCleanMeta = false;
+
ThreadPoolExecutor executor; // threads to retrieve data from regionservers
private List<WorkItem> asyncWork = Lists.newArrayList();
public static String json = null;
@@ -119,7 +127,7 @@ public class HBaseFsck {
* @param conf Configuration object
* @throws MasterNotRunningException if the master is not running
*/
- public HBaseFsck(Configuration conf)
+ public HBaseFsck(Configuration conf)
throws MasterNotRunningException, IOException {
this.conf = conf;
@@ -134,11 +142,11 @@ public class HBaseFsck {
new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);
}
-
+
public TreeMap<String, HbckInfo> getRegionInfo() {
return this.regionInfo;
}
-
+
public int initAndScanRootMeta() throws IOException {
// print hbase server version
errors.print("Version: " + status.getHBaseVersion());
@@ -147,23 +155,15 @@ public class HBaseFsck {
// Make sure regionInfo is empty before starting
regionInfo.clear();
tablesInfo.clear();
-
+
// get a list of all regions from the master. This involves
// scanning the META table
if (!recordRootRegion()) {
// Will remove later if we can fix it
- errors.reportError("Encountered fatal error. Exitting...");
+ errors.reportError("Encountered fatal error. Exiting...");
return -1;
}
- getMetaEntries();
-
- // Check if .META. is found only once and on the right place
- if (!checkMetaEntries()) {
- // Will remove later if we can fix it
- errors.reportError("Encountered fatal error. Exitting...");
- return -1;
- }
- return 0;
+ return getMetaEntries();
}
/**
@@ -172,7 +172,7 @@ public class HBaseFsck {
* @return 0 on success, non-zero on failure
*/
int doWork() throws IOException, InterruptedException {
-
+
if (initAndScanRootMeta() == -1) {
return -1;
}
@@ -196,7 +196,7 @@ public class HBaseFsck {
// From the master, get a list of all known live region servers
Collection<HServerInfo> regionServers = status.getServerInfo();
- errors.print("Number of live region servers:" +
+ errors.print("Number of live region servers:" +
regionServers.size());
if (details) {
for (HServerInfo rsinfo: regionServers) {
@@ -206,7 +206,7 @@ public class HBaseFsck {
// From the master, get a list of all dead region servers
Collection<String> deadRegionServers = status.getDeadServerNames();
- errors.print("Number of dead region servers:" +
+ errors.print("Number of dead region servers:" +
deadRegionServers.size());
if (details) {
for (String name: deadRegionServers) {
@@ -233,42 +233,42 @@ public class HBaseFsck {
if (checkRegionInfo) {
checkRegionInfo();
}
-
+
// Print table summary
printTableSummary();
return errors.summarize();
}
-
+
/**
- * Read the .regioninfo for all regions of the table and compare with the
- * corresponding entry in .META.
- * Entry in .regioninfo should be consistent with the entry in .META.
- *
+ * Read the .regioninfo for all regions of the table and compare with the
+ * corresponding entry in .META.
+ * Entry in .regioninfo should be consistent with the entry in .META.
+ *
*/
void checkRegionInfo() {
Path tableDir = null;
-
- try {
+
+ try {
for (HbckInfo hbi : regionInfo.values()) {
- tableDir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
+ tableDir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
hbi.metaEntry.getTableDesc().getName());
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(conf);
- Path regionPath = HRegion.getRegionDir(tableDir,
+ Path regionPath = HRegion.getRegionDir(tableDir,
hbi.metaEntry.getEncodedName());
Path regionInfoPath = new Path(regionPath, HRegion.REGIONINFO_FILE);
if (fs.exists(regionInfoPath) &&
fs.getFileStatus(regionInfoPath).getLen() > 0) {
FSDataInputStream in = fs.open(regionInfoPath);
HRegionInfo f_hri = null;
- try {
+ try {
f_hri = new HRegionInfo();
f_hri.readFields(in);
- } catch (IOException ex) {
- errors.reportError("Could not read .regioninfo file at "
+ } catch (IOException ex) {
+ errors.reportError("Could not read .regioninfo file at "
+ regionInfoPath);
} finally {
in.close();
@@ -276,23 +276,23 @@ public class HBaseFsck {
HbckInfo hbckinfo = regionInfo.get(f_hri.getEncodedName());
HRegionInfo m_hri = hbckinfo.metaEntry;
if(!f_hri.equals(m_hri)) {
- errors.reportError("Table name: " +
- f_hri.getTableDesc().getNameAsString() +
- " RegionInfo for "+ f_hri.getRegionNameAsString() +
+ errors.reportError("Table name: " +
+ f_hri.getTableDesc().getNameAsString() +
+ " RegionInfo for "+ f_hri.getRegionNameAsString() +
" inconsistent in .META. and .regioninfo");
}
} else {
if (!fs.exists(regionInfoPath)) {
- errors.reportError(".regioninfo not found at "
+ errors.reportError(".regioninfo not found at "
+ regionInfoPath.toString());
} else if (fs.getFileStatus(regionInfoPath).getLen() <= 0) {
- errors.reportError(".regioninfo file is empty (path = "
+ errors.reportError(".regioninfo file is empty (path = "
+ regionInfoPath + ")");
}
}
- }
+ }
} catch (IOException e) {
- errors.reportError("Error in comparing .regioninfo and .META."
+ errors.reportError("Error in comparing .regioninfo and .META."
+ e.getMessage());
}
}
@@ -308,7 +308,7 @@ public class HBaseFsck {
// list all tables from HDFS
List<FileStatus> tableDirs = Lists.newArrayList();
-
+
boolean foundVersionFile = false;
FileStatus[] files = fs.listStatus(rootDir);
for (FileStatus file : files) {
@@ -318,7 +318,7 @@ public class HBaseFsck {
tableDirs.add(file);
}
}
-
+
// verify that version file exists
if (!foundVersionFile) {
errors.reportError("Version file does not exist in root dir " + rootDir);
@@ -331,7 +331,7 @@ public class HBaseFsck {
asyncWork.add(work);
}
}
-
+
/**
* Record the location of the ROOT region as found in ZooKeeper,
* as if it were in a META table. This is so that we can check
@@ -340,28 +340,28 @@ public class HBaseFsck {
boolean recordRootRegion() throws IOException {
HRegionLocation rootLocation = connection.locateRegion(
HConstants.ROOT_TABLE_NAME, HConstants.EMPTY_START_ROW);
-
+
// Check if Root region is valid and existing
if (rootLocation == null || rootLocation.getRegionInfo() == null ||
rootLocation.getServerAddress() == null) {
- errors.reportError("Root Region or some of its attributes is null.");
+ errors.reportError("Root Region or some of its attributes is null.");
return false;
}
-
+
MetaEntry m = new MetaEntry(rootLocation.getRegionInfo(),
rootLocation.getServerAddress(), null, System.currentTimeMillis());
HbckInfo hbInfo = new HbckInfo(m);
regionInfo.put(rootLocation.getRegionInfo().getEncodedName(), hbInfo);
return true;
}
-
+
/**
* Contacts each regionserver and fetches metadata about regions.
* @throws IOException if a remote or network exception occurs
*/
void scanRegionServers() throws IOException, InterruptedException {
Collection<HServerInfo> regionServers = status.getServerInfo();
- errors.print("Number of live region servers:" +
+ errors.print("Number of live region servers:" +
regionServers.size());
if (details) {
for (HServerInfo rsinfo: regionServers) {
@@ -372,7 +372,7 @@ public class HBaseFsck {
// finish all async tasks before analyzing what we have
finishAsyncWork();
}
-
+
/**
* Contacts each regionserver and fetches metadata about regions.
* @param regionServerList - the list of region servers to connect to
@@ -409,7 +409,7 @@ public class HBaseFsck {
doConsistencyCheck(hbi);
}
}
-
+
/**
* Check a single region for consistency and correct deployment.
*/
@@ -426,10 +426,10 @@ public class HBaseFsck {
hbi.metaEntry.regionServer.equals(hbi.deployedOn.get(0));
boolean shouldBeDeployed = inMeta && !hbi.metaEntry.isOffline();
long tooRecent = System.currentTimeMillis() - timelag;
- boolean recentlyModified =
+ boolean recentlyModified =
(inHdfs && hbi.foundRegionDir.getModificationTime() > tooRecent) ||
(inMeta && hbi.metaEntry.modTime > tooRecent);
-
+
String tableName = null;
if (inMeta) {
tableName = hbi.metaEntry.getTableDesc().getNameAsString();
@@ -452,20 +452,17 @@ public class HBaseFsck {
} else {
tableInfo.addRegionDetails(RegionType.offline, descriptiveName);
}
- return;
} else if (inMeta && !shouldBeDeployed && !isDeployed) {
// offline regions shouldn't cause complaints
String message = "Region " + descriptiveName + " offline, ignoring.";
LOG.debug(message);
tableInfo.addRegionDetails(RegionType.offline, descriptiveName);
tableInfo.addRegionError(message);
- return;
} else if (recentlyModified) {
String message = "Region " + descriptiveName + " was recently modified -- skipping";
LOG.info(message);
tableInfo.addRegionDetails(RegionType.skipped, descriptiveName);
tableInfo.addRegionError(message);
- return;
}
// ========== Cases where the region is not in META =============
else if (!inMeta && !inHdfs && !isDeployed) {
@@ -516,8 +513,17 @@ public class HBaseFsck {
// If we are trying to fix the errors
tableInfo.addRegionDetails(RegionType.missing, descriptiveName);
tableInfo.addRegionError(message);
- if (fix == FixState.ALL) {
+ if (fix != FixState.NONE) {
errors.print("Trying to fix unassigned region...");
+ if (!hbi.deployedOn.isEmpty()) {
+ String addr = hbi.deployedOn.get(0).getHostAddressWithPort();
+ // Do not clean region assignment in META if a split log is ongoing for its assigned RS.
+ if (status.getDeadServerNames().contains(addr) && !forceCleanMeta) {
+ errors.print("Split log for dead server " + addr + " is possibly ongoing, skip unassigned region "
+ + hbi.metaEntry.getEncodedName());
+ return;
+ }
+ }
if (HBaseFsckRepair.fixUnassigned(this.conf, hbi.metaEntry)) {
setShouldRerun();
}
@@ -531,7 +537,7 @@ public class HBaseFsck {
tableInfo.addRegionError(message);
} else if (inMeta && inHdfs && isMultiplyDeployed) {
String message = "Region " + descriptiveName +
- " is listed in META on region server " + hbi.metaEntry.regionServer +
+ " is listed in META on region server " + hbi.metaEntry.regionServer +
" but is multiply assigned to region servers " +
Joiner.on(", ").join(hbi.deployedOn);
errors.reportFixableError(message);
@@ -545,8 +551,8 @@ public class HBaseFsck {
}
}
} else if (inMeta && inHdfs && isDeployed && !deploymentMatchesMeta) {
- String message = "Region " + descriptiveName +
- " listed in META on region server " + hbi.metaEntry.regionServer +
+ String message = "Region " + descriptiveName +
+ " listed in META on region server " + hbi.metaEntry.regionServer +
" but found on region server " + hbi.deployedOn.get(0);
errors.reportFixableError(message);
tableInfo.addRegionDetails(RegionType.unknown, descriptiveName);
@@ -586,8 +592,8 @@ public class HBaseFsck {
if (hbi.foundRegionDir == null) continue;
if (hbi.deployedOn.isEmpty()
&& !couldNotScan.contains(hbi.metaEntry.regionServer)) continue;
- if (hbi.onlyEdits) continue;
-
+ if (hbi.onlyEdits) continue;
+
// We should be safe here
String tableName = hbi.metaEntry.getTableDesc().getNameAsString();
TInfo modTInfo = tablesInfo.get(tableName);
@@ -600,11 +606,11 @@ public class HBaseFsck {
modTInfo.addEdge(hbi.metaEntry.getStartKey(), hbi.metaEntry.getEndKey());
tablesInfo.put(tableName, modTInfo);
}
-
+
for (TInfo tInfo : tablesInfo.values()) {
if (tInfo.getName().equals(UnkownTable)) continue;
if (!tInfo.check()) {
- errors.reportError("Found inconsistency in table " + tInfo.getName() +
+ errors.reportError("Found inconsistency in table " + tInfo.getName() +
": " + tInfo.getLastError());
}
}
@@ -626,9 +632,9 @@ public class HBaseFsck {
private class TInfo {
String tableName;
TreeMap <byte[], byte[]> edges;
- TreeSet <HServerAddress> deployedOn;
+ TreeSet <HServerAddress> deployedOn;
String lastError = null;
-
+
private TreeMap<RegionType, ArrayList<String>> regionDetails = new TreeMap<RegionType, ArrayList<String>>();
private ArrayList<String> regionErrors = new ArrayList<String>();
@@ -657,7 +663,7 @@ public class HBaseFsck {
return edges.size();
}
- public String getLastError() {
+ public String getLastError() {
return this.lastError;
}
@@ -676,23 +682,23 @@ public class HBaseFsck {
errors.detail('\t' + regionToStr(e));
}
}
-
+
byte[] last = new byte[0];
byte[] next = new byte[0];
TreeSet <byte[]> visited = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- // Each table should start with a zero-length byte[] and end at a
+ // Each table should start with a zero-length byte[] and end at a
// zero-length byte[]. Just follow the edges to see if this is true
while (true) {
// Check if region chain is broken
if (!edges.containsKey(last)) {
- this.lastError = "Cannot find region with start key "
+ this.lastError = "Cannot find region with start key "
+ posToStr(last);
return false;
}
next = edges.get(last);
// Found a cycle
if (visited.contains(next)) {
- this.lastError = "Cycle found in region chain. "
+ this.lastError = "Cycle found in region chain. "
+ "Current = "+ posToStr(last)
+ "; Cycle Start = " + posToStr(next);
return false;
@@ -704,7 +710,7 @@ public class HBaseFsck {
// If we have visited all elements we are fine
if (edges.size() != visited.size()) {
this.lastError = "Region in-order travesal does not include "
- + "all elements found in META. Chain=" + visited.size()
+ + "all elements found in META. Chain=" + visited.size()
+ "; META=" + edges.size() + "; Missing=";
for (Map.Entry<byte[], byte []> e : edges.entrySet()) {
if (!visited.contains(e.getKey())) {
@@ -771,12 +777,12 @@ public class HBaseFsck {
}
}
-
+
/**
* Return a list of table names whose metadata have not been modified in the
* last few milliseconds specified by timelag
- * if any of the REGIONINFO_QUALIFIER, SERVER_QUALIFIER, STARTCODE_QUALIFIER,
- * SPLITA_QUALIFIER, SPLITB_QUALIFIER have not changed in the last
+ * if any of the REGIONINFO_QUALIFIER, SERVER_QUALIFIER, STARTCODE_QUALIFIER,
+ * SPLITA_QUALIFIER, SPLITB_QUALIFIER have not changed in the last
* milliseconds specified by timelag, then the table is a candidate to be returned.
* @param regionList - all entries found in .META
* @return tables that have not been modified recently
@@ -821,75 +827,38 @@ public class HBaseFsck {
}
/**
- * Check values in regionInfo for .META.
- * Check if zero or more than one regions with META are found.
- * If there are inconsistencies (i.e. zero or more than one regions
- * pretend to be holding the .META.) try to fix that and report an error.
- * @throws IOException from HBaseFsckRepair functions
+ * Get values in regionInfo for .META.
+ *
+ * @return list of meta regions
*/
- boolean checkMetaEntries() throws IOException {
- List <HbckInfo> metaRegions = Lists.newArrayList();
+ private List<HbckInfo> getMetaRegions() {
+ List<HbckInfo> metaRegions = Lists.newArrayList();
for (HbckInfo value : regionInfo.values()) {
if (value.metaEntry.isMetaTable()) {
metaRegions.add(value);
}
}
-
- // If something is wrong
- if (metaRegions.size() != 1) {
- HRegionLocation rootLocation = connection.locateRegion(
- HConstants.ROOT_TABLE_NAME, HConstants.EMPTY_START_ROW);
- HbckInfo root =
- regionInfo.get(rootLocation.getRegionInfo().getEncodedName());
-
- // If there is no region holding .META.
- if (metaRegions.size() == 0) {
- errors.reportWarning(".META. is not found on any region.");
- if (fix == FixState.ALL) {
- errors.print("Trying to fix a problem with .META...");
- // try to fix it (treat it as unassigned region)
- if (HBaseFsckRepair.fixUnassigned(conf, root.metaEntry)) {
- setShouldRerun();
- }
- }
- }
- // If there are more than one regions pretending to hold the .META.
- else if (metaRegions.size() > 1) {
- errors.reportFixableError(".META. is found on more than one region.");
- if (fix != FixState.NONE) {
- errors.print("Trying to fix a problem with .META...");
- // try fix it (treat is a dupe assignment)
- List <HServerAddress> deployedOn = Lists.newArrayList();
- for (HbckInfo mRegion : metaRegions) {
- deployedOn.add(mRegion.metaEntry.regionServer);
- }
- if (HBaseFsckRepair.fixDupeAssignment(conf, root.metaEntry, deployedOn )) {
- setShouldRerun();
- }
- }
- }
- // rerun hbck with hopefully fixed META
- return false;
- }
- // no errors, so continue normally
- return true;
+
+ return metaRegions;
}
/**
* Scan .META. and -ROOT-, adding all regions found to the regionInfo map.
* @throws IOException if an error is encountered
*/
- void getMetaEntries() throws IOException {
+ int getMetaEntries() throws IOException {
MetaScannerVisitor visitor = new MetaScannerVisitor() {
int countRecord = 1;
// comparator to sort KeyValues with latest modtime
final Comparator<KeyValue> comp = new Comparator<KeyValue>() {
+ @Override
public int compare(KeyValue k1, KeyValue k2) {
return (int)(k1.getTimestamp() - k2.getTimestamp());
}
};
+ @Override
public boolean processRow(Result result) throws IOException {
try {
@@ -897,7 +866,7 @@ public class HBaseFsck {
long ts = Collections.max(result.list(), comp).getTimestamp();
// record region details
- byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
+ byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
HRegionInfo info = null;
HServerAddress server = null;
@@ -915,7 +884,7 @@ public class HBaseFsck {
}
// record region's start key
- value = result.getValue(HConstants.CATALOG_FAMILY,
+ value = result.getValue(HConstants.CATALOG_FAMILY,
HConstants.STARTCODE_QUALIFIER);
if (value != null) {
startCode = value;
@@ -927,7 +896,7 @@ public class HBaseFsck {
throw new IOException("Two entries in META are same " + previous);
}
- // show proof of progress to the user, once for every 100 records.
+ // show proof of progress to the user, once for every 100 records.
if (countRecord % 100 == 0) {
errors.progress();
}
@@ -944,10 +913,56 @@ public class HBaseFsck {
MetaScanner.metaScan(conf, visitor,
HConstants.ROOT_TABLE_NAME, HConstants.EMPTY_START_ROW, null,
Integer.MAX_VALUE);
-
+
+ List<HbckInfo> metaRegions = getMetaRegions();
+ // If there is no region holding .META., no way to fix
+ if (metaRegions.isEmpty()) {
+ errors.reportError("No META record in ROOT, not able to fix");
+ return -1;
+ }
+
// Scan .META. to pick up user regions
- MetaScanner.metaScan(conf, visitor);
- errors.print("");
+ try {
+ MetaScanner.metaScan(conf, visitor);
+ } catch (NotServingRegionException e) {
+ errors.reportWarning("META is not assigned");
+
+ boolean hasNonFixableFailure = false;
+
+ if (fix != FixState.NONE) {
+ errors.print("Trying to fix unassigned META...");
+ for (HbckInfo hbi : metaRegions) {
+ // Don't know which META region is not assigned (if there are more than one META regions).
+ // Try to reassign all of them. We don't expect more than one META regions so
+ // this un-optimal fix should be fine now.
+ if (hbi.metaEntry.isMetaTable()) {
+ String addr = hbi.metaEntry.regionServer.getHostAddressWithPort();
+ // Do not clean META assignment if a split log is going on the RS.
+ if (status.getDeadServerNames().contains(addr) && !forceCleanMeta) {
+ errors.print("Split log for dead server " + addr
+ + " is possibly ongoing, skip unassigned META region "
+ + hbi.metaEntry.getEncodedName());
+ hasNonFixableFailure = true;
+ } else if (!HBaseFsckRepair.fixUnassigned(this.conf, hbi.metaEntry)) {
+ hasNonFixableFailure = true;
+ }
+ }
+ }
+ } else {
+ errors.print("Fix not enabled, won't do anything");
+ throw e;
+ }
+
+ if (hasNonFixableFailure) {
+ errors.reportError("Some META region assignment issue could not be fixed, exiting...");
+ } else {
+ errors.print("Fixed some issue, will rerun");
+ setShouldRerun();
+ }
+ return -1;
+ }
+
+ return 0;
}
/**
@@ -957,7 +972,7 @@ public class HBaseFsck {
HServerAddress regionServer; // server hosting this region
long modTime; // timestamp of most recent modification metadata
- public MetaEntry(HRegionInfo rinfo, HServerAddress regionServer,
+ public MetaEntry(HRegionInfo rinfo, HServerAddress regionServer,
byte[] startCode, long modTime) {
super(rinfo);
this.regionServer = regionServer;
@@ -982,6 +997,7 @@ public class HBaseFsck {
this.deployedOn.add(server);
}
+ @Override
public synchronized String toString() {
if (metaEntry != null) {
return metaEntry.getRegionNameAsString();
@@ -1013,7 +1029,7 @@ public class HBaseFsck {
System.out.println("\n");
}
}
-
+
interface ErrorReporter {
public void reportWarning(String message);
public void reportError(String message);
@@ -1023,13 +1039,14 @@ public class HBaseFsck {
public void progress();
public void print(String message);
}
-
+
private static class PrintingErrorReporter implements ErrorReporter {
public int warnCount = 0;
public int errorCount = 0;
public int fixableCount = 0;
private int showProgress;
+ @Override
public synchronized void reportWarning(String message) {
if (!summary) {
if (HBaseFsck.json == null) {
@@ -1039,6 +1056,7 @@ public class HBaseFsck {
warnCount++;
}
+ @Override
public synchronized void reportError(String message) {
if (!summary) {
if (HBaseFsck.json == null) {
@@ -1049,6 +1067,7 @@ public class HBaseFsck {
showProgress = 0;
}
+ @Override
public synchronized void reportFixableError(String message) {
if (!summary) {
if (HBaseFsck.json == null) {
@@ -1059,6 +1078,7 @@ public class HBaseFsck {
showProgress = 0;
}
+ @Override
public synchronized int summarize() {
if (HBaseFsck.json == null) {
System.out.println(Integer.toString(errorCount + fixableCount) +
@@ -1088,7 +1108,8 @@ public class HBaseFsck {
return -2;
}
}
-
+
+ @Override
public synchronized void print(String message) {
if (HBaseFsck.json != null) return;
if (!summary) {
@@ -1096,6 +1117,7 @@ public class HBaseFsck {
}
}
+ @Override
public synchronized void detail(String message) {
if (details) {
if (HBaseFsck.json == null){
@@ -1105,6 +1127,7 @@ public class HBaseFsck {
showProgress = 0;
}
+ @Override
public synchronized void progress() {
if (showProgress++ == 10) {
if (!summary) {
@@ -1131,7 +1154,7 @@ public class HBaseFsck {
private HConnection connection;
private boolean done;
- WorkItemRegion(HBaseFsck hbck, HServerInfo info,
+ WorkItemRegion(HBaseFsck hbck, HServerInfo info,
ErrorReporter errors, HConnection connection) {
this.hbck = hbck;
this.rsinfo = info;
@@ -1141,6 +1164,7 @@ public class HBaseFsck {
}
// is this task done?
+ @Override
public synchronized boolean isDone() {
return done;
}
@@ -1154,7 +1178,7 @@ public class HBaseFsck {
// list all online regions from this region server
HRegionInfo[] regions = server.getRegionsAssignment();
-
+
if (details) {
StringBuffer buf = new StringBuffer();
buf.append("\nRegionServer:" + rsinfo.getServerName() +
@@ -1174,7 +1198,7 @@ public class HBaseFsck {
HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName());
hbi.addServer(rsinfo.getServerAddress());
}
- } catch (IOException e) { // unable to connect to the region server.
+ } catch (IOException e) { // unable to connect to the region server.
errors.reportWarning("RegionServer: " + rsinfo.getServerName()
+ " Unable to fetch region information. " + e);
hbck.addFailedServer(rsinfo.getServerAddress());
@@ -1195,7 +1219,7 @@ public class HBaseFsck {
private FileSystem fs;
private boolean done;
- WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors,
+ WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors,
FileStatus status) {
this.hbck = hbck;
this.fs = fs;
@@ -1204,9 +1228,10 @@ public class HBaseFsck {
this.done = false;
}
+ @Override
public synchronized boolean isDone() {
return done;
- }
+ }
@Override
public synchronized void run() {
@@ -1223,7 +1248,7 @@ public class HBaseFsck {
// ignore directories that aren't hexadecimal
if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
-
+
HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
synchronized (hbi) {
if (hbi.foundRegionDir != null) {
@@ -1231,7 +1256,7 @@ public class HBaseFsck {
hbi.foundRegionDir);
}
hbi.foundRegionDir = regionDir;
-
+
// Set a flag if this region contains only edits
// This is special case if a region is left after split
hbi.onlyEdits = true;
@@ -1246,7 +1271,7 @@ public class HBaseFsck {
}
}
}
- } catch (IOException e) { // unable to connect to the region server.
+ } catch (IOException e) { // unable to connect to the region server.
errors.reportError("Table Directory: " + tableDir.getPath().getName() +
" Unable to fetch region information. " + e);
} finally {
@@ -1296,7 +1321,7 @@ public class HBaseFsck {
void setShouldRerun() {
rerun = true;
}
-
+
boolean shouldRerun() {
return rerun;
}
@@ -1308,9 +1333,9 @@ public class HBaseFsck {
void setFixState(FixState newVal) {
fix = newVal;
}
-
+
/**
- * Let the user allow the opportunity to specify "-y" to all
+ * Let the user allow the opportunity to specify "-y" to all
* reconfirmation questions.
*/
static void setPromptResponse(boolean value) {
@@ -1319,7 +1344,7 @@ public class HBaseFsck {
static boolean getPromptResponse() {
return promptResponse;
}
-
+
/**
* We are interested in only those tables that have not changed their state in
* META during the last few seconds specified by hbase.admin.fsck.timelag
@@ -1328,7 +1353,7 @@ public class HBaseFsck {
void setTimeLag(long ms) {
timelag = ms;
}
-
+
/**
* Sets the output json file where the table summaries are written to
* @param summaryFileName - the file name
@@ -1343,18 +1368,28 @@ public class HBaseFsck {
* @param args
* @throws ParseException
*/
- public static void main(String [] args)
+ public static void main(String [] args)
throws IOException,
MasterNotRunningException, InterruptedException, ParseException {
Options opt = new Options();
- opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
- .withDescription("Override HBase Configuration Settings").create("D"));
- opt.addOption(OptionBuilder.withArgName("timeInSeconds").hasArg()
- .withDescription("Ignore regions with metadata updates in the last {timeInSeconds}.")
- .withType(PatternOptionBuilder.NUMBER_VALUE).create("timelag"));
- opt.addOption(OptionBuilder.withArgName("timeInSeconds").hasArg()
- .withDescription("Stop scan jobs after a fixed time & analyze existing data.")
- .withType(PatternOptionBuilder.NUMBER_VALUE).create("timeout"));
+
+ OptionBuilder.withArgName("property=value");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("Override HBase Configuration Settings");
+ opt.addOption(OptionBuilder.create("D"));
+
+ OptionBuilder.withArgName("timeInSeconds");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("Ignore regions with metadata updates in the last {timeInSeconds}.");
+ OptionBuilder.withType(PatternOptionBuilder.NUMBER_VALUE);
+ opt.addOption(OptionBuilder.create("timelag"));
+
+ OptionBuilder.withArgName("timeInSeconds");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("Stop scan jobs after a fixed time & analyze existing data.");
+ OptionBuilder.withType(PatternOptionBuilder.NUMBER_VALUE);
+ OptionBuilder.create("timeout");
+
opt.addOption("fix", false, "Try to fix some of the errors.");
opt.addOption("y", false, "Do not prompt for reconfirmation from users on fix.");
opt.addOption("w", false, "Try to fix warnings as well as errors.");
@@ -1363,6 +1398,9 @@ public class HBaseFsck {
opt.addOption("checkRegionInfo", false, "Check if .regioninfo is consistent with .META.");
opt.addOption("h", false, "Display this help");
opt.addOption("json", false, "Outputs the table summaries and errors in JSON format");
+ opt.addOption("forceCleanMeta", false,
+ "Clean meta entry of unassigned regions regardless of possible log splitting related to this region");
+
CommandLine cmd = new GnuParser().parse(opt, args);
// any unknown args or -h
@@ -1370,7 +1408,7 @@ public class HBaseFsck {
new HelpFormatter().printHelp("hbck", opt);
return;
}
-
+
if (cmd.hasOption("json")) {
Logger.getLogger("org.apache.zookeeper").setLevel(Level.OFF);
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.OFF);
@@ -1425,7 +1463,7 @@ public class HBaseFsck {
fsck.setFixState(FixState.ALL);
}
if (cmd.hasOption("y")) {
- fsck.setPromptResponse(true);
+ HBaseFsck.setPromptResponse(true);
}
if (cmd.hasOption("summary")) {
fsck.setSummary();
@@ -1436,6 +1474,10 @@ public class HBaseFsck {
if (cmd.hasOption("json")) {
fsck.setJsonFlag("json");
}
+ if (cmd.hasOption("forceCleanMeta")) {
+ forceCleanMeta = true;
+ }
+
int code = -1;
try {
// do the real work of fsck
@@ -1446,7 +1488,7 @@ public class HBaseFsck {
fsck.setFixState(FixState.NONE);
long fixTime = HBaseFsckRepair.getEstimatedFixTime(conf);
if (fixTime > 0) {
- LOG.info("Waiting " + StringUtils.formatTime(fixTime) +
+ LOG.info("Waiting " + StringUtils.formatTime(fixTime) +
" before checking to see if fixes worked...");
Thread.sleep(fixTime);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsckRepair.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsckRepair.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsckRepair.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseFsckRepair.java Wed Mar 12 21:17:13 2014
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.zookeeper.KeeperException;
public class HBaseFsckRepair {
@@ -73,27 +72,27 @@ public class HBaseFsckRepair {
// Clear status in master and zk
clearInMaster(conf, actualRegion);
clearInZK(conf, actualRegion);
-
+
// Clear assignment in META or ROOT
clearAssignment(conf, actualRegion);
return true;
}
-
+
public static int getEstimatedFixTime(Configuration conf)
throws IOException {
// Fix Time ~=
// META rescan interval (when master notices region is unassigned)
// + Time to Replay Recovered Edits (flushing HLogs == main bottleneck)
- int metaRescan = conf.getInt("hbase.master.meta.thread.rescanfrequency",
+ int metaRescan = conf.getInt("hbase.master.meta.thread.rescanfrequency",
60 * 1000);
// estimate = HLog Size * Max HLogs / Throughput [1 Gbps / 2 == 60MBps]
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(conf);
long logSize = conf.getLong("hbase.regionserver.hlog.blocksize",
- fs.getDefaultBlockSize())
+ fs.getDefaultBlockSize())
* conf.getInt("hbase.regionserver.maxlogs", 32);
- int recoverEdits = (int)(logSize / (60*1000*1000));
+ int recoverEdits = (int)(logSize / (60*1000*1000));
int pad = 1000; // 1 sec pad
return metaRescan + recoverEdits + pad;
@@ -122,7 +121,7 @@ public class HBaseFsckRepair {
private static void closeRegion(Configuration conf, HServerAddress server,
HRegionInfo region)
throws IOException {
- HRegionInterface rs =
+ HRegionInterface rs =
HConnectionManager.getConnection(conf).getHRegionConnection(server);
rs.closeRegion(region, false);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java Wed Mar 12 21:17:13 2014
@@ -75,9 +75,9 @@ public class HBaseLocalityCheck {
HbckInfo hbckInfo = entry.getValue();
if (hbckInfo == null || hbckInfo.metaEntry == null
|| localityInfo == null || hbckInfo.deployedOn == null
- || hbckInfo.deployedOn.size() == 0) {
+ || hbckInfo.deployedOn.isEmpty()) {
LOG.warn("<" + regionEncodedName + "> no info" +
- " obtained for this region from any of the region servers.");
+ " obtained for this region from any of the region servers.");
numUnknownRegion++;
continue;
}
@@ -99,7 +99,7 @@ public class HBaseLocalityCheck {
}
LOG.info("<" + tableName + " : " + regionEncodedName +
"> is running on host: " + realHostName + " \n " +
- "and the locality is " + localityPercentage);
+ "and the locality is " + localityPercentage);
}
LOG.info("======== Locality Summary ===============");