You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/23 21:54:15 UTC
svn commit: r1425525 [4/7] - in /hbase/branches/0.94-test: ./ bin/ conf/
security/src/main/java/org/apache/hadoop/hbase/ipc/
security/src/main/java/org/apache/hadoop/hbase/security/access/
security/src/test/java/org/apache/hadoop/hbase/security/access/...
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sun Dec 23 20:54:12 2012
@@ -105,8 +105,6 @@ public class ReplicationSource extends T
private int replicationQueueNbCapacity;
// Our reader for the current log
private HLog.Reader reader;
- // Current position in the log
- private long position = 0;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
@@ -132,10 +130,15 @@ public class ReplicationSource extends T
private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
+ // Current size of data we need to replicate
+ private int currentSize = 0;
// Indicates if this particular source is running
private volatile boolean running = true;
// Metrics for this source
private ReplicationSourceMetrics metrics;
+ // Handle on the log reader helper
+ private ReplicationHLogReaderManager repLogReader;
+
/**
* Instantiation method used by region servers
@@ -183,7 +186,7 @@ public class ReplicationSource extends T
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
-
+ this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try {
this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) {
@@ -263,8 +266,8 @@ public class ReplicationSource extends T
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
try {
- this.position = this.zkHelper.getHLogRepPosition(
- this.peerClusterZnode, this.queue.peek().getName());
+ this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
+ this.peerClusterZnode, this.queue.peek().getName()));
} catch (KeeperException e) {
this.terminate("Couldn't get the position of this recovered queue " +
peerClusterZnode, e);
@@ -322,6 +325,7 @@ public class ReplicationSource extends T
boolean gotIOE = false;
currentNbEntries = 0;
+ currentSize = 0;
try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
continue;
@@ -357,9 +361,7 @@ public class ReplicationSource extends T
}
} finally {
try {
- if (this.reader != null) {
- this.reader.close();
- }
+ this.repLogReader.closeReader();
} catch (IOException e) {
gotIOE = true;
LOG.warn("Unable to finalize the tailing of a file", e);
@@ -370,10 +372,10 @@ public class ReplicationSource extends T
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
- if (this.lastLoggedPosition != this.position) {
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.position;
+ this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++;
@@ -405,11 +407,9 @@ public class ReplicationSource extends T
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
throws IOException{
long seenEntries = 0;
- if (this.position != 0) {
- this.reader.seek(this.position);
- }
- long startPosition = this.position;
- HLog.Entry entry = readNextAndSetPosition();
+ this.repLogReader.seek();
+ HLog.Entry entry =
+ this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.logEditsReadRate.inc(1);
@@ -433,18 +433,18 @@ public class ReplicationSource extends T
}
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
+ currentSize += entry.getEdit().heapSize();
} else {
this.metrics.logEditsFilteredRate.inc(1);
}
}
// Stop if too many entries or too big
- if ((this.reader.getPosition() - startPosition)
- >= this.replicationQueueSizeCapacity ||
+ if (currentSize >= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) {
break;
}
try {
- entry = readNextAndSetPosition();
+ entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
} catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage());
break;
@@ -452,7 +452,7 @@ public class ReplicationSource extends T
}
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
- " and size: " + (this.reader.getPosition() - startPosition));
+ " and size: " + this.currentSize);
if (currentWALisBeingWrittenTo) {
return false;
}
@@ -461,16 +461,6 @@ public class ReplicationSource extends T
return seenEntries == 0 && processEndOfFile();
}
- private HLog.Entry readNextAndSetPosition() throws IOException {
- HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
- // Store the position so that in the future the reader can start
- // reading from here. If the above call to next() throws an
- // exception, the position won't be changed and retry will happen
- // from the last known good position
- this.position = this.reader.getPosition();
- return entry;
- }
-
private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) {
@@ -509,10 +499,9 @@ public class ReplicationSource extends T
protected boolean openReader(int sleepMultiplier) {
try {
LOG.debug("Opening log for replication " + this.currentPath.getName() +
- " at " + this.position);
+ " at " + this.repLogReader.getPosition());
try {
- this.reader = null;
- this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+ this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
if (this.queueRecovered) {
// We didn't find the log in the archive directory, look if it still
@@ -648,10 +637,10 @@ public class ReplicationSource extends T
HRegionInterface rrs = getRS();
LOG.debug("Replicating " + currentNbEntries);
rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
- if (this.lastLoggedPosition != this.position) {
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.position;
+ this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
}
this.totalReplicatedEdits += currentNbEntries;
this.metrics.shippedBatchesRate.inc(1);
@@ -721,7 +710,8 @@ public class ReplicationSource extends T
protected boolean processEndOfFile() {
if (this.queue.size() != 0) {
this.currentPath = null;
- this.position = 0;
+ this.repLogReader.finishCurrentFile();
+ this.reader = null;
return true;
} else if (this.queueRecovered) {
this.manager.closeRecoveredQueue(this);
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Sun Dec 23 20:54:12 2012
@@ -72,7 +72,6 @@ public class RemoteHTable implements HTa
final Client client;
final Configuration conf;
final byte[] name;
- final String accessToken;
final int maxRetries;
final long sleepTime;
@@ -81,10 +80,6 @@ public class RemoteHTable implements HTa
final long startTime, final long endTime, final int maxVersions) {
StringBuffer sb = new StringBuffer();
sb.append('/');
- if (accessToken != null) {
- sb.append(accessToken);
- sb.append('/');
- }
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append(Bytes.toStringBinary(row));
@@ -142,6 +137,29 @@ public class RemoteHTable implements HTa
return sb.toString();
}
+ protected String buildMultiRowSpec(final byte[][] rows, int maxVersions) {
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append("/multiget/");
+ if (rows == null || rows.length == 0) {
+ return sb.toString();
+ }
+ sb.append("?");
+ for(int i=0; i<rows.length; i++) {
+ byte[] rk = rows[i];
+ if (i != 0) {
+ sb.append('&');
+ }
+ sb.append("row=");
+ sb.append(Bytes.toStringBinary(rk));
+ }
+ sb.append("&v=");
+ sb.append(maxVersions);
+
+ return sb.toString();
+ }
+
protected Result[] buildResultFromModel(final CellSetModel model) {
List<Result> results = new ArrayList<Result>();
for (RowModel row: model.getRows()) {
@@ -187,7 +205,9 @@ public class RemoteHTable implements HTa
* @param client
* @param name
* @param accessToken
+ * @deprecated accessToken is not used and will be removed
*/
+ @Deprecated
public RemoteHTable(Client client, String name, String accessToken) {
this(client, HBaseConfiguration.create(), Bytes.toBytes(name), accessToken);
}
@@ -197,8 +217,20 @@ public class RemoteHTable implements HTa
* @param client
* @param conf
* @param name
+ */
+ public RemoteHTable(Client client, Configuration conf, String name) {
+ this(client, conf, Bytes.toBytes(name), null);
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param conf
+ * @param name
* @param accessToken
+ * @deprecated accessToken is not used and will be removed
*/
+ @Deprecated
public RemoteHTable(Client client, Configuration conf, String name,
String accessToken) {
this(client, conf, Bytes.toBytes(name), accessToken);
@@ -206,14 +238,28 @@ public class RemoteHTable implements HTa
/**
* Constructor
+ * @param client
+ * @param conf
+ * @param name
+ */
+ public RemoteHTable(Client client, Configuration conf, byte[] name) {
+ this(client, conf, name, null);
+ }
+
+ /**
+ * Constructor
+ * @param client
* @param conf
+ * @param name
+ * @param accessToken
+ * @deprecated accessToken is not used and will be removed
*/
+ @Deprecated
public RemoteHTable(Client client, Configuration conf, byte[] name,
String accessToken) {
this.client = client;
this.conf = conf;
this.name = name;
- this.accessToken = accessToken;
this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
}
@@ -229,10 +275,6 @@ public class RemoteHTable implements HTa
public HTableDescriptor getTableDescriptor() throws IOException {
StringBuilder sb = new StringBuilder();
sb.append('/');
- if (accessToken != null) {
- sb.append(accessToken);
- sb.append('/');
- }
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append("schema");
@@ -267,30 +309,68 @@ public class RemoteHTable implements HTa
if (get.getFilter() != null) {
LOG.warn("filters not supported on gets");
}
+ Result[] results = getResults(spec);
+ if (results.length > 0) {
+ if (results.length > 1) {
+ LOG.warn("too many results for get (" + results.length + ")");
+ }
+ return results[0];
+ } else {
+ return new Result();
+ }
+ }
+
+ public Result[] get(List<Get> gets) throws IOException {
+ byte[][] rows = new byte[gets.size()][];
+ int maxVersions = 1;
+ int count = 0;
+
+ for (Get g : gets) {
+
+ if (count == 0) {
+ maxVersions = g.getMaxVersions();
+ } else if (g.getMaxVersions() != maxVersions) {
+ LOG.warn("MaxVersions on Gets do not match, using the first in the list ("
+ + maxVersions +")");
+ }
+
+ if (g.getFilter() != null) {
+ LOG.warn("filters not supported on gets");
+ }
+
+ rows[count] = g.getRow();
+ count++;
+ }
+
+ String spec = buildMultiRowSpec(rows, maxVersions);
+
+ return getResults(spec);
+ }
+
+ private Result[] getResults(String spec) throws IOException {
for (int i = 0; i < maxRetries; i++) {
Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
int code = response.getCode();
switch (code) {
- case 200:
- CellSetModel model = new CellSetModel();
- model.getObjectFromMessage(response.getBody());
- Result[] results = buildResultFromModel(model);
- if (results.length > 0) {
- if (results.length > 1) {
- LOG.warn("too many results for get (" + results.length + ")");
+ case 200:
+ CellSetModel model = new CellSetModel();
+ model.getObjectFromMessage(response.getBody());
+ Result[] results = buildResultFromModel(model);
+ if (results.length > 0) {
+ return results;
}
- return results[0];
- }
- // fall through
- case 404:
- return new Result();
- case 509:
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) { }
- break;
- default:
- throw new IOException("get request returned " + code);
+ // fall through
+ case 404:
+ return new Result[0];
+
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ break;
+ default:
+ throw new IOException("get request returned " + code);
}
}
throw new IOException("get request timed out");
@@ -306,10 +386,6 @@ public class RemoteHTable implements HTa
CellSetModel model = buildModelFromPut(put);
StringBuilder sb = new StringBuilder();
sb.append('/');
- if (accessToken != null) {
- sb.append(accessToken);
- sb.append('/');
- }
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append(Bytes.toStringBinary(put.getRow()));
@@ -364,10 +440,6 @@ public class RemoteHTable implements HTa
// build path for multiput
StringBuilder sb = new StringBuilder();
sb.append('/');
- if (accessToken != null) {
- sb.append(accessToken);
- sb.append('/');
- }
sb.append(Bytes.toStringBinary(name));
sb.append("/$multiput"); // can be any nonexistent row
for (int i = 0; i < maxRetries; i++) {
@@ -433,10 +505,6 @@ public class RemoteHTable implements HTa
}
StringBuffer sb = new StringBuffer();
sb.append('/');
- if (accessToken != null) {
- sb.append(accessToken);
- sb.append('/');
- }
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append("scanner");
@@ -575,10 +643,16 @@ public class RemoteHTable implements HTa
throw new IOException("getRowOrBefore not supported");
}
+ /**
+ * @deprecated {@link RowLock} and associated operations are deprecated
+ */
public RowLock lockRow(byte[] row) throws IOException {
throw new IOException("lockRow not implemented");
}
+ /**
+ * @deprecated {@link RowLock} and associated operations are deprecated
+ */
public void unlockRow(RowLock rl) throws IOException {
throw new IOException("unlockRow not implemented");
}
@@ -591,10 +665,6 @@ public class RemoteHTable implements HTa
CellSetModel model = buildModelFromPut(put);
StringBuilder sb = new StringBuilder();
sb.append('/');
- if (accessToken != null) {
- sb.append(accessToken);
- sb.append('/');
- }
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append(Bytes.toStringBinary(put.getRow()));
@@ -630,10 +700,6 @@ public class RemoteHTable implements HTa
CellSetModel model = buildModelFromPut(put);
StringBuilder sb = new StringBuilder();
sb.append('/');
- if (accessToken != null) {
- sb.append(accessToken);
- sb.append('/');
- }
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append(Bytes.toStringBinary(row));
@@ -690,11 +756,6 @@ public class RemoteHTable implements HTa
}
@Override
- public Result[] get(List<Get> gets) throws IOException {
- throw new IOException("get(List<Get>) not supported");
- }
-
- @Override
public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
byte[] row) {
throw new
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java Sun Dec 23 20:54:12 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.security
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.mapred.JobConf;
@@ -216,12 +217,15 @@ public abstract class User {
}
/**
- * Returns whether or not secure authentication is enabled for HBase
- * (whether <code>hbase.security.authentication</code> is set to
- * <code>kerberos</code>.
+ * Returns whether or not secure authentication is enabled for HBase. Note that
+ * HBase security requires HDFS security to provide any guarantees, so this requires that
+ * both <code>hbase.security.authentication</code> and <code>hadoop.security.authentication</code>
+ * are set to <code>kerberos</code>.
*/
public static boolean isHBaseSecurityEnabled(Configuration conf) {
- return "kerberos".equalsIgnoreCase(conf.get(HBASE_SECURITY_CONF_KEY));
+ return "kerberos".equalsIgnoreCase(conf.get(HBASE_SECURITY_CONF_KEY)) &&
+ "kerberos".equalsIgnoreCase(
+ conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
}
/* Concrete implementations */
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java Sun Dec 23 20:54:12 2012
@@ -63,7 +63,7 @@ public abstract class AbstractHBaseTool
protected abstract void processOptions(CommandLine cmd);
/** The "main function" of the tool */
- protected abstract void doWork() throws Exception;
+ protected abstract int doWork() throws Exception;
@Override
public Configuration getConf() {
@@ -99,13 +99,14 @@ public abstract class AbstractHBaseTool
processOptions(cmd);
+ int ret = EXIT_FAILURE;
try {
- doWork();
+ ret = doWork();
} catch (Exception e) {
LOG.error("Error running command-line tool", e);
return EXIT_FAILURE;
}
- return EXIT_SUCCESS;
+ return ret;
}
private boolean sanityCheckOptions(CommandLine cmd) {
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java Sun Dec 23 20:54:12 2012
@@ -25,9 +25,6 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ChecksumFactory;
-
/**
* Checksum types. The Checksum type is a one byte number
* that stores a representation of the checksum algorithm
@@ -70,7 +67,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32);
LOG.info("Checksum using " + PURECRC32);
} catch (Exception e) {
- LOG.info(PURECRC32 + " not available.");
+ LOG.trace(PURECRC32 + " not available.");
}
try {
// The default checksum class name is java.util.zip.CRC32.
@@ -80,7 +77,7 @@ public enum ChecksumType {
LOG.info("Checksum can use " + JDKCRC);
}
} catch (Exception e) {
- LOG.warn(JDKCRC + " not available. ", e);
+ LOG.trace(JDKCRC + " not available.");
}
}
@@ -113,7 +110,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32C);
LOG.info("Checksum can use " + PURECRC32C);
} catch (Exception e) {
- LOG.info(PURECRC32C + " not available. ");
+ LOG.trace(PURECRC32C + " not available.");
}
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Sun Dec 23 20:54:12 2012
@@ -151,7 +151,7 @@ public abstract class FSUtils {
*/
public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException {
- LOG.debug("Creating file:" + path + "with permission:" + perm);
+ LOG.debug("Creating file=" + path + " with permission=" + perm);
return fs.create(path, perm, overwrite,
fs.getConf().getInt("io.file.buffer.size", 4096),
@@ -1013,6 +1013,25 @@ public abstract class FSUtils {
}
/**
+ * Given a particular region dir, return all the familydirs inside it
+ *
+ * @param fs A file system for the Path
+ * @param regionDir Path to a specific region directory
+ * @return List of paths to valid family directories in region dir.
+ * @throws IOException
+ */
+ public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
+ // assumes we are in a region dir.
+ FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
+ List<Path> familyDirs = new ArrayList<Path>(fds.length);
+ for (FileStatus fdfs: fds) {
+ Path fdPath = fdfs.getPath();
+ familyDirs.add(fdPath);
+ }
+ return familyDirs;
+ }
+
+ /**
* Filter for HFiles that excludes reference files.
*/
public static class HFileFilter implements PathFilter {
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Sun Dec 23 20:54:12 2012
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
@@ -93,6 +95,9 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import com.google.common.base.Joiner;
@@ -146,7 +151,7 @@ import com.google.common.collect.TreeMul
* can be used to limit the kinds of repairs hbck will do. See the code in
* {@link #printUsageAndExit()} for more details.
*/
-public class HBaseFsck {
+public class HBaseFsck extends Configured implements Tool {
public static final long DEFAULT_TIME_LAG = 60000; // default value of 1 minute
public static final long DEFAULT_SLEEP_BEFORE_RERUN = 10000;
private static final int MAX_NUM_THREADS = 50; // #threads to contact regions
@@ -159,7 +164,6 @@ public class HBaseFsck {
* Internal resources
**********************/
private static final Log LOG = LogFactory.getLog(HBaseFsck.class.getName());
- private Configuration conf;
private ClusterStatus status;
private HConnection connection;
private HBaseAdmin admin;
@@ -176,12 +180,14 @@ public class HBaseFsck {
private long timelag = DEFAULT_TIME_LAG; // tables whose modtime is older
private boolean fixAssignments = false; // fix assignment errors?
private boolean fixMeta = false; // fix meta errors?
+ private boolean checkHdfs = true; // load and check fs consistency?
private boolean fixHdfsHoles = false; // fix fs holes?
private boolean fixHdfsOverlaps = false; // fix fs overlaps (risky)
private boolean fixHdfsOrphans = false; // fix fs holes (missing .regioninfo)
private boolean fixTableOrphans = false; // fix fs holes (missing .tableinfo)
private boolean fixVersionFile = false; // fix missing hbase.version file in hdfs
private boolean fixSplitParents = false; // fix lingering split parents
+ private boolean fixReferenceFiles = false; // fix lingering reference store file
// limit checking/fixes to listed tables, if empty attempt to check/fix all
// -ROOT- and .META. are always checked
@@ -199,7 +205,7 @@ public class HBaseFsck {
/*********
* State
*********/
- private ErrorReporter errors = new PrintingErrorReporter();
+ final private ErrorReporter errors;
int fixes = 0;
/**
@@ -240,8 +246,9 @@ public class HBaseFsck {
* @throws ZooKeeperConnectionException if unable to connect to ZooKeeper
*/
public HBaseFsck(Configuration conf) throws MasterNotRunningException,
- ZooKeeperConnectionException, IOException {
- this.conf = conf;
+ ZooKeeperConnectionException, IOException, ClassNotFoundException {
+ super(conf);
+ errors = getErrorReporter(conf);
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
executor = new ScheduledThreadPoolExecutor(numThreads);
@@ -258,8 +265,9 @@ public class HBaseFsck {
* if unable to connect to ZooKeeper
*/
public HBaseFsck(Configuration conf, ExecutorService exec) throws MasterNotRunningException,
- ZooKeeperConnectionException, IOException {
- this.conf = conf;
+ ZooKeeperConnectionException, IOException, ClassNotFoundException {
+ super(conf);
+ errors = getErrorReporter(getConf());
this.executor = exec;
}
@@ -268,8 +276,8 @@ public class HBaseFsck {
* online state.
*/
public void connect() throws IOException {
- admin = new HBaseAdmin(conf);
- meta = new HTable(conf, HConstants.META_TABLE_NAME);
+ admin = new HBaseAdmin(getConf());
+ meta = new HTable(getConf(), HConstants.META_TABLE_NAME);
status = admin.getMaster().getClusterStatus();
connection = admin.getConnection();
}
@@ -333,11 +341,11 @@ public class HBaseFsck {
*/
public void offlineHdfsIntegrityRepair() throws IOException, InterruptedException {
// Initial pass to fix orphans.
- if (shouldFixHdfsOrphans() || shouldFixHdfsHoles()
- || shouldFixHdfsOverlaps() || shouldFixTableOrphans()) {
+ if (shouldCheckHdfs() && (shouldFixHdfsOrphans() || shouldFixHdfsHoles()
+ || shouldFixHdfsOverlaps() || shouldFixTableOrphans())) {
LOG.info("Loading regioninfos HDFS");
// if nothing is happening this should always complete in two iterations.
- int maxIterations = conf.getInt("hbase.hbck.integrityrepair.iterations.max", 3);
+ int maxIterations = getConf().getInt("hbase.hbck.integrityrepair.iterations.max", 3);
int curIter = 0;
do {
clearState(); // clears hbck state and reset fixes to 0 and.
@@ -391,8 +399,10 @@ public class HBaseFsck {
loadDeployedRegions();
// load regiondirs and regioninfos from HDFS
- loadHdfsRegionDirs();
- loadHdfsRegionInfos();
+ if (shouldCheckHdfs()) {
+ loadHdfsRegionDirs();
+ loadHdfsRegionInfos();
+ }
// Empty cells in .META.?
reportEmptyMetaCells();
@@ -429,6 +439,8 @@ public class HBaseFsck {
admin.setBalancerRunning(oldBalancer, false);
}
+ offlineReferenceFileRepair();
+
// Print table summary
printTableSummary(tablesInfo);
return errors.summarize();
@@ -455,7 +467,7 @@ public class HBaseFsck {
*/
private void adoptHdfsOrphan(HbckInfo hi) throws IOException {
Path p = hi.getHdfsRegionDir();
- FileSystem fs = p.getFileSystem(conf);
+ FileSystem fs = p.getFileSystem(getConf());
FileStatus[] dirs = fs.listStatus(p);
if (dirs == null) {
LOG.warn("Attempt to adopt ophan hdfs region skipped becuase no files present in " +
@@ -480,7 +492,7 @@ public class HBaseFsck {
byte[] start, end;
HFile.Reader hf = null;
try {
- CacheConfig cacheConf = new CacheConfig(conf);
+ CacheConfig cacheConf = new CacheConfig(getConf());
hf = HFile.createReader(fs, hfile.getPath(), cacheConf);
hf.loadFileInfo();
KeyValue startKv = KeyValue.createKeyValueFromKey(hf.getFirstKey());
@@ -528,7 +540,7 @@ public class HBaseFsck {
// create new region on hdfs. move data into place.
HRegionInfo hri = new HRegionInfo(template.getName(), orphanRegionRange.getFirst(), orphanRegionRange.getSecond());
LOG.info("Creating new region : " + hri);
- HRegion region = HBaseFsckRepair.createHDFSRegionDir(conf, hri, template);
+ HRegion region = HBaseFsckRepair.createHDFSRegionDir(getConf(), hri, template);
Path target = region.getRegionDir();
// rename all the data to new region
@@ -585,6 +597,67 @@ public class HBaseFsck {
}
/**
+ * Scan all the store file names to find any lingering reference files,
+ * which refer to some none-exiting files. If "fix" option is enabled,
+ * any lingering reference file will be sidelined if found.
+ * <p>
+ * Lingering reference file prevents a region from opening. It has to
+ * be fixed before a cluster can start properly.
+ */
+ private void offlineReferenceFileRepair() throws IOException {
+ Configuration conf = getConf();
+ Path hbaseRoot = FSUtils.getRootDir(conf);
+ FileSystem fs = hbaseRoot.getFileSystem(conf);
+ Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, hbaseRoot);
+ for (Path path: allFiles.values()) {
+ boolean isReference = false;
+ try {
+ isReference = StoreFile.isReference(path);
+ } catch (Throwable t) {
+ // Ignore. Some files may not be store files at all.
+ // For example, files under .oldlogs folder in .META.
+ // Warning message is already logged by
+ // StoreFile#isReference.
+ }
+ if (!isReference) continue;
+
+ Path referredToFile = StoreFile.getReferredToFile(path);
+ if (fs.exists(referredToFile)) continue; // good, expected
+
+ // Found a lingering reference file
+ errors.reportError(ERROR_CODE.LINGERING_REFERENCE_HFILE,
+ "Found lingering reference file " + path);
+ if (!shouldFixReferenceFiles()) continue;
+
+ // Now, trying to fix it since requested
+ boolean success = false;
+ String pathStr = path.toString();
+
+ // A reference file path should be like
+ // ${hbase.rootdir}/table_name/region_id/family_name/referred_file.region_name
+ // Up 3 directories to get the table folder.
+ // So the file will be sidelined to a similar folder structure.
+ int index = pathStr.lastIndexOf(Path.SEPARATOR_CHAR);
+ for (int i = 0; index > 0 && i < 3; i++) {
+ index = pathStr.lastIndexOf(Path.SEPARATOR_CHAR, index);
+ }
+ if (index > 0) {
+ Path rootDir = getSidelineDir();
+ Path dst = new Path(rootDir, pathStr.substring(index));
+ fs.mkdirs(dst.getParent());
+ LOG.info("Trying to sildeline reference file"
+ + path + " to " + dst);
+ setShouldRerun();
+
+ success = fs.rename(path, dst);
+ }
+ if (!success) {
+ LOG.error("Failed to sideline reference file " + path);
+ }
+ }
+ }
+
+ /**
* TODO -- need to add tests for this.
*/
private void reportEmptyMetaCells() {
@@ -640,7 +713,7 @@ public class HBaseFsck {
}
Path regioninfo = new Path(regionDir, HRegion.REGIONINFO_FILE);
- FileSystem fs = regioninfo.getFileSystem(conf);
+ FileSystem fs = regioninfo.getFileSystem(getConf());
FSDataInputStream in = fs.open(regioninfo);
HRegionInfo hri = new HRegionInfo();
@@ -715,11 +788,11 @@ public class HBaseFsck {
if (modTInfo == null) {
// only executed once per table.
modTInfo = new TableInfo(tableName);
- Path hbaseRoot = FSUtils.getRootDir(conf);
+ Path hbaseRoot = FSUtils.getRootDir(getConf());
tablesInfo.put(tableName, modTInfo);
try {
HTableDescriptor htd =
- FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(conf),
+ FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(getConf()),
hbaseRoot, tableName);
modTInfo.htds.add(htd);
} catch (IOException ioe) {
@@ -748,7 +821,7 @@ public class HBaseFsck {
*/
private Set<String> getColumnFamilyList(Set<String> columns, HbckInfo hbi) throws IOException {
Path regionDir = hbi.getHdfsRegionDir();
- FileSystem fs = regionDir.getFileSystem(conf);
+ FileSystem fs = regionDir.getFileSystem(getConf());
FileStatus[] subDirs = fs.listStatus(regionDir, new FSUtils.FamilyDirFilter(fs));
for (FileStatus subdir : subDirs) {
String columnfamily = subdir.getPath().getName();
@@ -771,7 +844,7 @@ public class HBaseFsck {
for (String columnfamimly : columns) {
htd.addFamily(new HColumnDescriptor(columnfamimly));
}
- FSTableDescriptors.createTableDescriptor(htd, conf, true);
+ FSTableDescriptors.createTableDescriptor(htd, getConf(), true);
return true;
}
@@ -787,12 +860,12 @@ public class HBaseFsck {
public void fixOrphanTables() throws IOException {
if (shouldFixTableOrphans() && !orphanTableDirs.isEmpty()) {
- Path hbaseRoot = FSUtils.getRootDir(conf);
+ Path hbaseRoot = FSUtils.getRootDir(getConf());
List<String> tmpList = new ArrayList<String>();
tmpList.addAll(orphanTableDirs.keySet());
HTableDescriptor[] htds = getHTableDescriptors(tmpList);
- Iterator iter = orphanTableDirs.entrySet().iterator();
- int j = 0;
+ Iterator<Entry<String, Set<String>>> iter = orphanTableDirs.entrySet().iterator();
+ int j = 0;
int numFailedCase = 0;
while (iter.hasNext()) {
Entry<String, Set<String>> entry = (Entry<String, Set<String>>) iter.next();
@@ -803,7 +876,7 @@ public class HBaseFsck {
HTableDescriptor htd = htds[j];
LOG.info("fixing orphan table: " + tableName + " from cache");
FSTableDescriptors.createTableDescriptor(
- hbaseRoot.getFileSystem(conf), hbaseRoot, htd, true);
+ hbaseRoot.getFileSystem(getConf()), hbaseRoot, htd, true);
j++;
iter.remove();
}
@@ -842,8 +915,8 @@ public class HBaseFsck {
* @return an open .META. HRegion
*/
private HRegion createNewRootAndMeta() throws IOException {
- Path rootdir = new Path(conf.get(HConstants.HBASE_DIR));
- Configuration c = conf;
+ Path rootdir = new Path(getConf().get(HConstants.HBASE_DIR));
+ Configuration c = getConf();
HRegionInfo rootHRI = new HRegionInfo(HRegionInfo.ROOT_REGIONINFO);
MasterFileSystem.setInfoFamilyCachingForRoot(false);
HRegionInfo metaHRI = new HRegionInfo(HRegionInfo.FIRST_META_REGIONINFO);
@@ -986,7 +1059,7 @@ public class HBaseFsck {
for (TableInfo tInfo : tablesInfo.values()) {
TableIntegrityErrorHandler handler;
if (fixHoles || fixOverlaps) {
- handler = tInfo.new HDFSIntegrityFixer(tInfo, errors, conf,
+ handler = tInfo.new HDFSIntegrityFixer(tInfo, errors, getConf(),
fixHoles, fixOverlaps);
} else {
handler = tInfo.new IntegrityFixSuggester(tInfo, errors);
@@ -1001,7 +1074,7 @@ public class HBaseFsck {
private Path getSidelineDir() throws IOException {
if (sidelineDir == null) {
- Path hbaseDir = FSUtils.getRootDir(conf);
+ Path hbaseDir = FSUtils.getRootDir(getConf());
Path hbckDir = new Path(hbaseDir, HConstants.HBCK_SIDELINEDIR_NAME);
sidelineDir = new Path(hbckDir, hbaseDir.getName() + "-"
+ startMillis);
@@ -1118,8 +1191,8 @@ public class HBaseFsck {
*/
Path sidelineOldRootAndMeta() throws IOException {
// put current -ROOT- and .META. aside.
- Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
- FileSystem fs = hbaseDir.getFileSystem(conf);
+ Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
+ FileSystem fs = hbaseDir.getFileSystem(getConf());
Path backupDir = getSidelineDir();
fs.mkdirs(backupDir);
@@ -1151,7 +1224,7 @@ public class HBaseFsck {
*/
private void loadDisabledTables()
throws ZooKeeperConnectionException, IOException {
- HConnectionManager.execute(new HConnectable<Void>(conf) {
+ HConnectionManager.execute(new HConnectable<Void>(getConf()) {
@Override
public Void connect(HConnection connection) throws IOException {
ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
@@ -1179,8 +1252,8 @@ public class HBaseFsck {
* regionInfoMap
*/
public void loadHdfsRegionDirs() throws IOException, InterruptedException {
- Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
- FileSystem fs = rootDir.getFileSystem(conf);
+ Path rootDir = new Path(getConf().get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(getConf());
// list all tables from HDFS
List<FileStatus> tableDirs = Lists.newArrayList();
@@ -1208,8 +1281,8 @@ public class HBaseFsck {
LOG.info("Trying to create a new " + HConstants.VERSION_FILE_NAME
+ " file.");
setShouldRerun();
- FSUtils.setVersion(fs, rootDir, conf.getInt(
- HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000), conf.getInt(
+ FSUtils.setVersion(fs, rootDir, getConf().getInt(
+ HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000), getConf().getInt(
HConstants.VERSION_FILE_WRITE_ATTEMPTS,
HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
}
@@ -1335,8 +1408,8 @@ public class HBaseFsck {
return;
}
- Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
- FileSystem fs = hbaseDir.getFileSystem(conf);
+ Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
+ FileSystem fs = hbaseDir.getFileSystem(getConf());
UserGroupInformation ugi = User.getCurrent().getUGI();
FileStatus[] files = fs.listStatus(hbaseDir);
for (FileStatus file : files) {
@@ -1492,8 +1565,12 @@ public class HBaseFsck {
errors.print(msg);
undeployRegions(hbi);
setShouldRerun();
- HBaseFsckRepair.fixUnassigned(admin, hbi.getHdfsHRI());
- HBaseFsckRepair.waitUntilAssigned(admin, hbi.getHdfsHRI());
+ HRegionInfo hri = hbi.getHdfsHRI();
+ if (hri == null) {
+ hri = hbi.metaEntry;
+ }
+ HBaseFsckRepair.fixUnassigned(admin, hri);
+ HBaseFsckRepair.waitUntilAssigned(admin, hri);
}
}
@@ -1505,7 +1582,8 @@ public class HBaseFsck {
String descriptiveName = hbi.toString();
boolean inMeta = hbi.metaEntry != null;
- boolean inHdfs = hbi.getHdfsRegionDir()!= null;
+ // In case not checking HDFS, assume the region is on HDFS
+ boolean inHdfs = !shouldCheckHdfs() || hbi.getHdfsRegionDir() != null;
boolean hasMetaAssignment = inMeta && hbi.metaEntry.regionServer != null;
boolean isDeployed = !hbi.deployedOn.isEmpty();
boolean isMultiplyDeployed = hbi.deployedOn.size() > 1;
@@ -1515,7 +1593,7 @@ public class HBaseFsck {
boolean splitParent =
(hbi.metaEntry == null)? false: hbi.metaEntry.isSplit() && hbi.metaEntry.isOffline();
boolean shouldBeDeployed = inMeta && !isTableDisabled(hbi.metaEntry);
- boolean recentlyModified = hbi.getHdfsRegionDir() != null &&
+ boolean recentlyModified = inHdfs &&
hbi.getModTime() + timelag > System.currentTimeMillis();
// ========== First the healthy cases =============
@@ -1558,7 +1636,7 @@ public class HBaseFsck {
}
LOG.info("Patching .META. with .regioninfo: " + hbi.getHdfsHRI());
- HBaseFsckRepair.fixMetaHoleOnline(conf, hbi.getHdfsHRI());
+ HBaseFsckRepair.fixMetaHoleOnline(getConf(), hbi.getHdfsHRI());
tryAssignmentRepair(hbi, "Trying to reassign region...");
}
@@ -1574,7 +1652,7 @@ public class HBaseFsck {
}
LOG.info("Patching .META. with with .regioninfo: " + hbi.getHdfsHRI());
- HBaseFsckRepair.fixMetaHoleOnline(conf, hbi.getHdfsHRI());
+ HBaseFsckRepair.fixMetaHoleOnline(getConf(), hbi.getHdfsHRI());
tryAssignmentRepair(hbi, "Trying to fix unassigned region...");
}
@@ -1736,7 +1814,7 @@ public class HBaseFsck {
debugLsr(contained.getHdfsRegionDir());
// rename the contained into the container.
- FileSystem fs = targetRegionDir.getFileSystem(conf);
+ FileSystem fs = targetRegionDir.getFileSystem(getConf());
FileStatus[] dirs = fs.listStatus(contained.getHdfsRegionDir());
if (dirs == null) {
@@ -2351,7 +2429,7 @@ public class HBaseFsck {
HTableDescriptor[] htd = new HTableDescriptor[0];
try {
LOG.info("getHTableDescriptors == tableNames => " + tableNames);
- htd = new HBaseAdmin(conf).getTableDescriptors(tableNames);
+ htd = new HBaseAdmin(getConf()).getTableDescriptors(tableNames);
} catch (IOException e) {
LOG.debug("Exception getting table descriptors", e);
}
@@ -2494,12 +2572,12 @@ public class HBaseFsck {
};
// Scan -ROOT- to pick up META regions
- MetaScanner.metaScan(conf, visitor, null, null,
+ MetaScanner.metaScan(getConf(), visitor, null, null,
Integer.MAX_VALUE, HConstants.ROOT_TABLE_NAME);
if (!checkMetaOnly) {
// Scan .META. to pick up user regions
- MetaScanner.metaScan(conf, visitor);
+ MetaScanner.metaScan(getConf(), visitor);
}
errors.print("");
@@ -2753,6 +2831,12 @@ public class HBaseFsck {
}
}
+ private static ErrorReporter getErrorReporter(
+ final Configuration conf) throws ClassNotFoundException {
+ Class<? extends ErrorReporter> reporter = conf.getClass("hbasefsck.errorreporter", PrintingErrorReporter.class, ErrorReporter.class);
+ return (ErrorReporter)ReflectionUtils.newInstance(reporter, conf);
+ }
+
public interface ErrorReporter {
public static enum ERROR_CODE {
UNKNOWN, NO_META_REGION, NULL_ROOT_REGION, NO_VERSION_FILE, NOT_IN_META_HDFS, NOT_IN_META,
@@ -2760,7 +2844,7 @@ public class HBaseFsck {
MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE,
FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS,
HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
- ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE
+ ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE
}
public void clear();
public void report(String message);
@@ -2778,7 +2862,7 @@ public class HBaseFsck {
public boolean tableHasErrors(TableInfo table);
}
- private static class PrintingErrorReporter implements ErrorReporter {
+ static class PrintingErrorReporter implements ErrorReporter {
public int errorCount = 0;
private int showProgress;
@@ -3128,6 +3212,14 @@ public class HBaseFsck {
return fixMeta;
}
+ public void setCheckHdfs(boolean checking) {
+ checkHdfs = checking;
+ }
+
+ boolean shouldCheckHdfs() {
+ return checkHdfs;
+ }
+
public void setFixHdfsHoles(boolean shouldFix) {
fixHdfsHoles = shouldFix;
}
@@ -3184,6 +3276,14 @@ public class HBaseFsck {
return fixSplitParents;
}
+ public void setFixReferenceFiles(boolean shouldFix) {
+ fixReferenceFiles = shouldFix;
+ }
+
+ boolean shouldFixReferenceFiles() {
+ return fixReferenceFiles;
+ }
+
public boolean shouldIgnorePreCheckPermission() {
return ignorePreCheckPermission;
}
@@ -3245,7 +3345,7 @@ public class HBaseFsck {
}
protected HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
- return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles);
+ return new HFileCorruptionChecker(getConf(), executor, sidelineCorruptHFiles);
}
public HFileCorruptionChecker getHFilecorruptionChecker() {
@@ -3283,6 +3383,8 @@ public class HBaseFsck {
System.err.println(" -fix Try to fix region assignments. This is for backwards compatiblity");
System.err.println(" -fixAssignments Try to fix region assignments. Replaces the old -fix");
System.err.println(" -fixMeta Try to fix meta problems. This assumes HDFS region info is good.");
+ System.err.println(" -noHdfsChecking Don't load/check region info from HDFS."
+ + " Assumes META region info is good. Won't check/fix any HDFS issue, e.g. hole, orphan, or overlap");
System.err.println(" -fixHdfsHoles Try to fix region holes in hdfs.");
System.err.println(" -fixHdfsOrphans Try to fix region dirs with no .regioninfo file in hdfs");
System.err.println(" -fixTableOrphans Try to fix table dirs with no .tableinfo file in hdfs (online mode only)");
@@ -3293,6 +3395,7 @@ public class HBaseFsck {
System.err.println(" -maxOverlapsToSideline <n> When fixing region overlaps, allow at most <n> regions to sideline per group. (n=" + DEFAULT_OVERLAPS_TO_SIDELINE +" by default)");
System.err.println(" -fixSplitParents Try to force offline split parents to be online.");
System.err.println(" -ignorePreCheckPermission ignore filesystem permission pre-check");
+ System.err.println(" -fixReferenceFiles Try to offline lingering reference store files");
System.err.println("");
System.err.println(" Datafile Repair options: (expert features, use with caution!)");
@@ -3302,7 +3405,7 @@ public class HBaseFsck {
System.err.println("");
System.err.println(" Metadata Repair shortcuts");
System.err.println(" -repair Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " +
- "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps");
+ "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles");
System.err.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles");
setRetCode(-2);
@@ -3316,7 +3419,6 @@ public class HBaseFsck {
* @throws Exception
*/
public static void main(String[] args) throws Exception {
-
// create a fsck object
Configuration conf = HBaseConfiguration.create();
Path hbasedir = new Path(conf.get(HConstants.HBASE_DIR));
@@ -3324,12 +3426,14 @@ public class HBaseFsck {
conf.set("fs.defaultFS", defaultFs.toString()); // for hadoop 0.21+
conf.set("fs.default.name", defaultFs.toString()); // for hadoop 0.20
- int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
- ExecutorService exec = new ScheduledThreadPoolExecutor(numThreads);
- HBaseFsck hbck = new HBaseFsck(conf, exec);
- hbck.exec(exec, args);
- int retcode = hbck.getRetCode();
- Runtime.getRuntime().exit(retcode);
+ int ret = ToolRunner.run(new HBaseFsck(conf), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ exec(executor, args);
+ return getRetCode();
}
public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException,
@@ -3386,6 +3490,8 @@ public class HBaseFsck {
setFixAssignments(true);
} else if (cmd.equals("-fixMeta")) {
setFixMeta(true);
+ } else if (cmd.equals("-noHdfsChecking")) {
+ setCheckHdfs(false);
} else if (cmd.equals("-fixHdfsHoles")) {
setFixHdfsHoles(true);
} else if (cmd.equals("-fixHdfsOrphans")) {
@@ -3406,6 +3512,8 @@ public class HBaseFsck {
checkCorruptHFiles = true;
} else if (cmd.equals("-sidelineCorruptHFiles")) {
sidelineCorruptHFiles = true;
+ } else if (cmd.equals("-fixReferenceFiles")) {
+ setFixReferenceFiles(true);
} else if (cmd.equals("-repair")) {
// this attempts to merge overlapping hdfs regions, needs testing
// under load
@@ -3417,6 +3525,8 @@ public class HBaseFsck {
setFixVersionFile(true);
setSidelineBigOverlaps(true);
setFixSplitParents(false);
+ setCheckHdfs(true);
+ setFixReferenceFiles(true);
} else if (cmd.equals("-repairHoles")) {
// this will make all missing hdfs regions available but may lose data
setFixHdfsHoles(true);
@@ -3426,6 +3536,7 @@ public class HBaseFsck {
setFixHdfsOverlaps(false);
setSidelineBigOverlaps(false);
setFixSplitParents(false);
+ setCheckHdfs(true);
} else if (cmd.equals("-maxOverlapsToSideline")) {
if (i == args.length - 1) {
System.err.println("-maxOverlapsToSideline needs a numeric value argument.");
@@ -3484,13 +3595,13 @@ public class HBaseFsck {
setHFileCorruptionChecker(hfcc); // so we can get result
Collection<String> tables = getIncludedTables();
Collection<Path> tableDirs = new ArrayList<Path>();
- Path rootdir = FSUtils.getRootDir(conf);
+ Path rootdir = FSUtils.getRootDir(getConf());
if (tables.size() > 0) {
for (String t : tables) {
tableDirs.add(FSUtils.getTablePath(rootdir, t));
}
} else {
- tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(conf), rootdir);
+ tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(getConf()), rootdir);
}
hfcc.checkTables(tableDirs);
PrintWriter out = new PrintWriter(System.out);
@@ -3530,14 +3641,14 @@ public class HBaseFsck {
* ls -r for debugging purposes
*/
void debugLsr(Path p) throws IOException {
- debugLsr(conf, p);
+ debugLsr(getConf(), p);
}
/**
* ls -r for debugging purposes
*/
public static void debugLsr(Configuration conf, Path p) throws IOException {
- if (!LOG.isDebugEnabled()) {
+ if (!LOG.isDebugEnabled() || p == null) {
return;
}
FileSystem fs = p.getFileSystem(conf);
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java Sun Dec 23 20:54:12 2012
@@ -126,7 +126,7 @@ public class Threads {
/**
* @param millis How long to sleep for in milliseconds.
*/
- public static void sleep(int millis) {
+ public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
@@ -158,15 +158,15 @@ public class Threads {
}
/**
- * Create a new CachedThreadPool with a bounded number as the maximum
+ * Create a new CachedThreadPool with a bounded number as the maximum
* thread size in the pool.
- *
+ *
* @param maxCachedThread the maximum thread could be created in the pool
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param threadFactory the factory to use when creating new threads
- * @return threadPoolExecutor the cachedThreadPool with a bounded number
- * as the maximum thread size in the pool.
+ * @return threadPoolExecutor the cachedThreadPool with a bounded number
+ * as the maximum thread size in the pool.
*/
public static ThreadPoolExecutor getBoundedCachedThreadPool(
int maxCachedThread, long timeout, TimeUnit unit,
@@ -178,12 +178,13 @@ public class Threads {
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
return boundedCachedThreadPool;
}
-
-
+
+
/**
- * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
- * with a common prefix.
- * @param prefix The prefix of every created Thread's name
+ * Returns a {@link java.util.concurrent.ThreadFactory} that names each
+ * created thread uniquely, with a common prefix.
+ *
+ * @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
@@ -205,6 +206,7 @@ public class Threads {
};
}
+
/**
* Get a named {@link ThreadFactory} that just builds daemon threads
* @param prefix name prefix for all threads created from the factory
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java Sun Dec 23 20:54:12 2012
@@ -61,6 +61,12 @@ public class HQuorumPeer {
writeMyID(zkProperties);
QuorumPeerConfig zkConfig = new QuorumPeerConfig();
zkConfig.parseProperties(zkProperties);
+
+ // login the zookeeper server principal (if using security)
+ ZKUtil.loginServer(conf, "hbase.zookeeper.server.keytab.file",
+ "hbase.zookeeper.server.kerberos.principal",
+ zkConfig.getClientPortAddress().getHostName());
+
runZKServer(zkConfig);
} catch (Exception e) {
e.printStackTrace();
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Sun Dec 23 20:54:12 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.zookeepe
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -32,11 +33,16 @@ import org.apache.hadoop.hbase.util.Retr
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
/**
* A zookeeper that can handle 'recoverable' errors.
@@ -490,6 +496,61 @@ public class RecoverableZooKeeper {
}
}
+ /**
+ * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
+ * instances to actually pass to multi (need to do this in order to appendMetaData).
+ */
+ private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
+ throws UnsupportedOperationException {
+ if(ops == null) return null;
+
+ List<Op> preparedOps = new LinkedList<Op>();
+ for (Op op : ops) {
+ if (op.getType() == ZooDefs.OpCode.create) {
+ CreateRequest create = (CreateRequest)op.toRequestRecord();
+ preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
+ create.getAcl(), create.getFlags()));
+ } else if (op.getType() == ZooDefs.OpCode.delete) {
+ // no need to appendMetaData for delete
+ preparedOps.add(op);
+ } else if (op.getType() == ZooDefs.OpCode.setData) {
+ SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
+ preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
+ setData.getVersion()));
+ } else {
+ throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
+ }
+ }
+ return preparedOps;
+ }
+
+ /**
+ * Run multiple operations in a transactional manner. Retry before throwing exception
+ */
+ public List<OpResult> multi(Iterable<Op> ops)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ Iterable<Op> multiOps = prepareZKMulti(ops);
+ while (true) {
+ try {
+ return zk.multi(multiOps);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "multi");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
private String findPreviousSequentialNode(String path)
throws KeeperException, InterruptedException {
int lastSlashIdx = path.lastIndexOf('/');
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java Sun Dec 23 20:54:12 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.zookeepe
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.KeeperException;
/**
@@ -228,16 +230,19 @@ public class ZKTable {
}
}
synchronized (this.cache) {
+ List<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
if (settingToEnabled) {
- ZKUtil.deleteNodeFailSilent(this.watcher, znode92);
+ ops.add(ZKUtilOp.deleteNodeFailSilent(znode92));
}
else {
- ZKUtil.setData(this.watcher, znode92, Bytes.toBytes(state.toString()));
+ ops.add(ZKUtilOp.setData(znode92, Bytes.toBytes(state.toString())));
}
- // Set the current format znode after the 0.92 format znode.
+ // If not running multi-update either because of configuration or failure,
+ // set the current format znode after the 0.92 format znode.
// This is so in the case of failure, the AssignmentManager is guaranteed to
// see the state was not applied, since it uses the current format znode internally.
- ZKUtil.setData(this.watcher, znode, Bytes.toBytes(state.toString()));
+ ops.add(ZKUtilOp.setData(znode, Bytes.toBytes(state.toString())));
+ ZKUtil.multiOrSequential(this.watcher, ops, true);
this.cache.put(tableName, state);
}
}
@@ -292,13 +297,16 @@ public class ZKTable {
public void setDeletedTable(final String tableName)
throws KeeperException {
synchronized (this.cache) {
- ZKUtil.deleteNodeFailSilent(this.watcher,
- ZKUtil.joinZNode(this.watcher.masterTableZNode92, tableName));
- // Delete the current format znode after the 0.92 format znode.
- // This is so in the case of failure, the AssignmentManager is guaranteed to
- // see the table was not deleted, since it uses the current format znode internally.
- ZKUtil.deleteNodeFailSilent(this.watcher,
- ZKUtil.joinZNode(this.watcher.masterTableZNode, tableName));
+ List<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
+ ops.add(ZKUtilOp.deleteNodeFailSilent(
+ ZKUtil.joinZNode(this.watcher.masterTableZNode92, tableName)));
+ // If not running multi-update either because of configuration or failure,
+ // delete the current format znode after the 0.92 format znode. This is so in the case of
+ // failure, the AssignmentManager is guaranteed to see the table was not deleted, since it
+ // uses the current format znode internally.
+ ops.add(ZKUtilOp.deleteNodeFailSilent(
+ ZKUtil.joinZNode(this.watcher.masterTableZNode, tableName)));
+ ZKUtil.multiOrSequential(this.watcher, ops, true);
if (this.cache.remove(tableName) == null) {
LOG.warn("Moving table " + tableName + " state to deleted but was " +
"already deleted");
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Sun Dec 23 20:54:12 2012
@@ -24,10 +24,21 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
+import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -39,15 +50,24 @@ import org.apache.hadoop.hbase.ServerNam
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Op;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.server.ZooKeeperSaslServer;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
/**
* Internal HBase utility class for ZooKeeper.
@@ -108,6 +128,170 @@ public class ZKUtil {
retry, retryIntervalMillis);
}
+ /**
+ * Log in the current zookeeper server process using the given configuration
+ * keys for the credential file and login principal.
+ *
+ * <p><strong>This is only applicable when running on secure hbase</strong>
+ * On regular HBase (without security features), this will safely be ignored.
+ * </p>
+ *
+ * @param conf The configuration data to use
+ * @param keytabFileKey Property key used to configure the path to the credential file
+ * @param userNameKey Property key used to configure the login principal
+ * @param hostname Current hostname to use in any credentials
+ * @throws IOException underlying exception from SecurityUtil.login() call
+ */
+ public static void loginServer(Configuration conf, String keytabFileKey,
+ String userNameKey, String hostname) throws IOException {
+ login(conf, keytabFileKey, userNameKey, hostname,
+ ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
+ JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME);
+ }
+
+ /**
+ * Log in the current zookeeper client using the given configuration
+ * keys for the credential file and login principal.
+ *
+ * <p><strong>This is only applicable when running on secure hbase</strong>
+ * On regular HBase (without security features), this will safely be ignored.
+ * </p>
+ *
+ * @param conf The configuration data to use
+ * @param keytabFileKey Property key used to configure the path to the credential file
+ * @param userNameKey Property key used to configure the login principal
+ * @param hostname Current hostname to use in any credentials
+ * @throws IOException underlying exception from SecurityUtil.login() call
+ */
+ public static void loginClient(Configuration conf, String keytabFileKey,
+ String userNameKey, String hostname) throws IOException {
+ login(conf, keytabFileKey, userNameKey, hostname,
+ ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+ JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME);
+ }
+
+ /**
+ * Log in the current process using the given configuration keys for the
+ * credential file and login principal.
+ *
+ * <p><strong>This is only applicable when running on secure hbase</strong>
+ * On regular HBase (without security features), this will safely be ignored.
+ * </p>
+ *
+ * @param conf The configuration data to use
+ * @param keytabFileKey Property key used to configure the path to the credential file
+ * @param userNameKey Property key used to configure the login principal
+ * @param hostname Current hostname to use in any credentials
+ * @param loginContextProperty property name to expose the entry name
+ * @param loginContextName jaas entry name
+ * @throws IOException underlying exception from SecurityUtil.login() call
+ */
+ private static void login(Configuration conf, String keytabFileKey,
+ String userNameKey, String hostname,
+ String loginContextProperty, String loginContextName)
+ throws IOException {
+ if (!isSecureZooKeeper(conf))
+ return;
+
+ // User has specified a jaas.conf, keep this one as the good one.
+ // HBASE_OPTS="-Djava.security.auth.login.config=jaas.conf"
+ if (System.getProperty("java.security.auth.login.config") != null)
+ return;
+
+ String keytabFilename = conf.get(keytabFileKey);
+ String principalConfig = conf.get(userNameKey, System.getProperty("user.name"));
+ String principalName = SecurityUtil.getServerPrincipal(principalConfig, hostname);
+
+ // Initialize the "jaas.conf" for keyTab/principal,
+ // If keyTab is not specified use the Ticket Cache.
+ // and set the zookeeper login context name.
+ JaasConfiguration jaasConf = new JaasConfiguration(loginContextName,
+ keytabFilename, principalName);
+ javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+ System.setProperty(loginContextProperty, loginContextName);
+ }
+
+ /**
+ * A JAAS configuration that defines the login modules that we want to use for login.
+ */
+ private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+ private static final String SERVER_KEYTAB_KERBEROS_CONFIG_NAME =
+ "zookeeper-server-keytab-kerberos";
+ private static final String CLIENT_KEYTAB_KERBEROS_CONFIG_NAME =
+ "zookeeper-client-keytab-kerberos";
+
+ private static final Map<String, String> BASIC_JAAS_OPTIONS =
+ new HashMap<String,String>();
+ static {
+ String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
+ if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
+ BASIC_JAAS_OPTIONS.put("debug", "true");
+ }
+ }
+
+ private static final Map<String,String> KEYTAB_KERBEROS_OPTIONS =
+ new HashMap<String,String>();
+ static {
+ KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
+ KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
+ KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
+ KEYTAB_KERBEROS_OPTIONS.putAll(BASIC_JAAS_OPTIONS);
+ }
+
+ private static final AppConfigurationEntry KEYTAB_KERBEROS_LOGIN =
+ new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
+ LoginModuleControlFlag.REQUIRED,
+ KEYTAB_KERBEROS_OPTIONS);
+
+ private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF =
+ new AppConfigurationEntry[]{KEYTAB_KERBEROS_LOGIN};
+
+ private javax.security.auth.login.Configuration baseConfig;
+ private final String loginContextName;
+ private final boolean useTicketCache;
+ private final String keytabFile;
+ private final String principal;
+
+ public JaasConfiguration(String loginContextName, String principal) {
+ this(loginContextName, principal, null, true);
+ }
+
+ public JaasConfiguration(String loginContextName, String principal, String keytabFile) {
+ this(loginContextName, principal, keytabFile, keytabFile == null || keytabFile.length() == 0);
+ }
+
+ private JaasConfiguration(String loginContextName, String principal,
+ String keytabFile, boolean useTicketCache) {
+ try {
+ this.baseConfig = javax.security.auth.login.Configuration.getConfiguration();
+ } catch (SecurityException e) {
+ this.baseConfig = null;
+ }
+ this.loginContextName = loginContextName;
+ this.useTicketCache = useTicketCache;
+ this.keytabFile = keytabFile;
+ this.principal = principal;
+ LOG.info("JaasConfiguration loginContextName=" + loginContextName +
+ " principal=" + principal + " useTicketCache=" + useTicketCache +
+ " keytabFile=" + keytabFile);
+ }
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+ if (loginContextName.equals(appName)) {
+ if (!useTicketCache) {
+ KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
+ KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
+ }
+ KEYTAB_KERBEROS_OPTIONS.put("principal", principal);
+ KEYTAB_KERBEROS_OPTIONS.put("useTicketCache", useTicketCache ? "true" : "false");
+ return KEYTAB_KERBEROS_CONF;
+ }
+ if (baseConfig != null) return baseConfig.getAppConfigurationEntry(appName);
+ return(null);
+ }
+ }
+
//
// Helper methods
//
@@ -249,9 +433,6 @@ public class ZKUtil {
/**
* Check if the specified node exists. Sets no watches.
*
- * Returns true if node exists, false if not. Returns an exception if there
- * is an unexpected zookeeper exception.
- *
* @param zkw zk reference
* @param znode path of node to watch
* @return version of the node if it exists, -1 if does not exist
@@ -700,19 +881,29 @@ public class ZKUtil {
*/
public static void setData(ZooKeeperWatcher zkw, String znode, byte [] data)
throws KeeperException, KeeperException.NoNodeException {
- setData(zkw, znode, data, -1);
+ setData(zkw, (SetData)ZKUtilOp.setData(znode, data));
}
+ private static void setData(ZooKeeperWatcher zkw, SetData setData)
+ throws KeeperException, KeeperException.NoNodeException {
+ SetDataRequest sd = (SetDataRequest)toZooKeeperOp(zkw, setData).toRequestRecord();
+ setData(zkw, sd.getPath(), sd.getData(), sd.getVersion());
+ }
+
+ /**
+ * Returns whether or not secure authentication is enabled
+ * (whether <code>hbase.security.authentication</code> is set to
+ * <code>kerberos</code>.
+ */
public static boolean isSecureZooKeeper(Configuration conf) {
- // TODO: We need a better check for security enabled ZooKeeper. Currently
- // the secure ZooKeeper client is set up using a supplied JaaS
- // configuration file. But if the system property for the JaaS
- // configuration file is set, this may not be an exclusive indication
- // that HBase should set ACLs on znodes. As an alternative, we could do
- // this more like Hadoop and build a JaaS configuration programmatically
- // based on a site conf setting. The scope of such a change will be
- // addressed in HBASE-4791.
- return (System.getProperty("java.security.auth.login.config") != null);
+ // hbase shell need to use:
+ // -Djava.security.auth.login.config=user-jaas.conf
+ // since each user has a different jaas.conf
+ if (System.getProperty("java.security.auth.login.config") != null)
+ return true;
+
+ // Master & RSs uses hbase.zookeeper.client.*
+ return "kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"));
}
private static ArrayList<ACL> createACL(ZooKeeperWatcher zkw, String node) {
@@ -896,14 +1087,20 @@ public class ZKUtil {
* @throws KeeperException if unexpected zookeeper exception
*/
public static void createAndFailSilent(ZooKeeperWatcher zkw,
- String znode)
+ String znode) throws KeeperException {
+ createAndFailSilent(zkw,
+ (CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, new byte[0]));
+ }
+
+ private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs)
throws KeeperException {
+ CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord();
+ String znode = create.getPath();
try {
RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
waitForZKConnectionIfAuthenticating(zkw);
if (zk.exists(znode, false) == null) {
- zk.create(znode, new byte[0], createACL(zkw,znode),
- CreateMode.PERSISTENT);
+ zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags()));
}
} catch(KeeperException.NodeExistsException nee) {
} catch(KeeperException.NoAuthException nee){
@@ -989,8 +1186,15 @@ public class ZKUtil {
*/
public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
throws KeeperException {
+ deleteNodeFailSilent(zkw,
+ (DeleteNodeFailSilent)ZKUtilOp.deleteNodeFailSilent(node));
+ }
+
+ private static void deleteNodeFailSilent(ZooKeeperWatcher zkw,
+ DeleteNodeFailSilent dnfs) throws KeeperException {
+ DeleteRequest delete = (DeleteRequest)toZooKeeperOp(zkw, dnfs).toRequestRecord();
try {
- zkw.getRecoverableZooKeeper().delete(node, -1);
+ zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion());
} catch(KeeperException.NoNodeException nne) {
} catch(InterruptedException ie) {
zkw.interruptedException(ie);
@@ -1038,6 +1242,209 @@ public class ZKUtil {
}
}
+ /**
+ * Represents an action taken by ZKUtil, e.g. createAndFailSilent.
+ * These actions are higher-level than {@link ZKOp} actions, which represent
+ * individual actions in the ZooKeeper API, like create.
+ */
+ public abstract static class ZKUtilOp {
+ private String path;
+
+ private ZKUtilOp(String path) {
+ this.path = path;
+ }
+
+ /**
+ * @return a createAndFailSilent ZKUtilOp
+ */
+ public static ZKUtilOp createAndFailSilent(String path, byte[] data) {
+ return new CreateAndFailSilent(path, data);
+ }
+
+ /**
+ * @return a deleteNodeFailSilent ZKUtilOP
+ */
+ public static ZKUtilOp deleteNodeFailSilent(String path) {
+ return new DeleteNodeFailSilent(path);
+ }
+
+ /**
+ * @return a setData ZKUtilOp
+ */
+ public static ZKUtilOp setData(String path, byte [] data) {
+ return new SetData(path, data);
+ }
+
+ /**
+ * @return path to znode where the ZKOp will occur
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * ZKUtilOp representing createAndFailSilent in ZooKeeper
+ * (attempt to create node, ignore error if already exists)
+ */
+ public static class CreateAndFailSilent extends ZKUtilOp {
+ private byte [] data;
+
+ private CreateAndFailSilent(String path, byte [] data) {
+ super(path);
+ this.data = data;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof CreateAndFailSilent)) return false;
+
+ CreateAndFailSilent op = (CreateAndFailSilent) o;
+ return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+ }
+ }
+
+ /**
+ * ZKUtilOp representing deleteNodeFailSilent in ZooKeeper
+ * (attempt to delete node, ignore error if node doesn't exist)
+ */
+ public static class DeleteNodeFailSilent extends ZKUtilOp {
+ private DeleteNodeFailSilent(String path) {
+ super(path);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof DeleteNodeFailSilent)) return false;
+
+ return super.equals(o);
+ }
+ }
+
+ /**
+ * @return ZKUtilOp representing setData in ZooKeeper
+ */
+ public static class SetData extends ZKUtilOp {
+ private byte [] data;
+
+ private SetData(String path, byte [] data) {
+ super(path);
+ this.data = data;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof SetData)) return false;
+
+ SetData op = (SetData) o;
+ return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+ }
+ }
+ }
+
+ /**
+ * Convert from ZKUtilOp to ZKOp
+ */
+ private static Op toZooKeeperOp(ZooKeeperWatcher zkw, ZKUtilOp op)
+ throws UnsupportedOperationException {
+ if(op == null) return null;
+
+ if (op instanceof CreateAndFailSilent) {
+ CreateAndFailSilent cafs = (CreateAndFailSilent)op;
+ return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
+ CreateMode.PERSISTENT);
+ } else if (op instanceof DeleteNodeFailSilent) {
+ DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
+ return Op.delete(dnfs.getPath(), -1);
+ } else if (op instanceof SetData) {
+ SetData sd = (SetData)op;
+ return Op.setData(sd.getPath(), sd.getData(), -1);
+ } else {
+ throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
+ + op.getClass().getName());
+ }
+ }
+
+ /**
+ * If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update functionality.
+ * Otherwise, run the list of operations sequentially.
+ *
+ * If all of the following are true:
+ * - runSequentialOnMultiFailure is true
+ * - hbase.zookeeper.useMulti is true
+ * - on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*)
+ * Then:
+ * - we retry the operations one-by-one (sequentially)
+ *
+ * Note *: an example is receiving a NodeExistsException from a "create" call. Without multi,
+ * a user could call "createAndFailSilent" to ensure that a node exists if they don't care who
+ * actually created the node (i.e. the NodeExistsException from ZooKeeper is caught).
+ * This will cause all operations in the multi to fail, however, because
+ * the NodeExistsException that zk.create throws will fail the multi transaction.
+ * In this case, if the previous conditions hold, the commands are run sequentially, which should
+ * result in the correct final state, but means that the operations will not run atomically.
+ *
+ * @throws KeeperException
+ */
+ public static void multiOrSequential(ZooKeeperWatcher zkw, List<ZKUtilOp> ops,
+ boolean runSequentialOnMultiFailure) throws KeeperException {
+ if (ops == null) return;
+ boolean useMulti = zkw.getConfiguration().getBoolean(HConstants.ZOOKEEPER_USEMULTI, false);
+
+ if (useMulti) {
+ List<Op> zkOps = new LinkedList<Op>();
+ for (ZKUtilOp op : ops) {
+ zkOps.add(toZooKeeperOp(zkw, op));
+ }
+ try {
+ zkw.getRecoverableZooKeeper().multi(zkOps);
+ } catch (KeeperException ke) {
+ switch (ke.code()) {
+ case NODEEXISTS:
+ case NONODE:
+ case BADVERSION:
+ case NOAUTH:
+ // if we get an exception that could be solved by running sequentially
+ // (and the client asked us to), then break out and run sequentially
+ if (runSequentialOnMultiFailure) {
+ LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "."
+ + " Attempting to run operations sequentially because"
+ + " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + ".");
+ break;
+ }
+ default:
+ throw ke;
+ }
+ } catch (InterruptedException ie) {
+ zkw.interruptedException(ie);
+ }
+ }
+
+ // run sequentially
+ for (ZKUtilOp op : ops) {
+ if (op instanceof CreateAndFailSilent) {
+ createAndFailSilent(zkw, (CreateAndFailSilent)op);
+ } else if (op instanceof DeleteNodeFailSilent) {
+ deleteNodeFailSilent(zkw, (DeleteNodeFailSilent)op);
+ } else if (op instanceof SetData) {
+ setData(zkw, (SetData)op);
+ } else {
+ throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
+ + op.getClass().getName());
+ }
+ }
+ }
+
//
// ZooKeeper cluster information
//
Modified: hbase/branches/0.94-test/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/resources/hbase-default.xml?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/resources/hbase-default.xml (original)
+++ hbase/branches/0.94-test/src/main/resources/hbase-default.xml Sun Dec 23 20:54:12 2012
@@ -690,6 +690,17 @@
for more information.
</description>
</property>
+ <property>
+ <name>hbase.zookeeper.useMulti</name>
+ <value>false</value>
+ <description>Instructs HBase to make use of ZooKeeper's multi-update functionality.
+ This allows certain ZooKeeper operations to complete more quickly and prevents some issues
+ with rare ZooKeeper failure scenarios (see the release note of HBASE-6710 for an example).
+ IMPORTANT: only set this to true if all ZooKeeper servers in the cluster are on version 3.4+
+ and will not be downgraded. ZooKeeper versions before 3.4 do not support multi-update and will
+ not fail gracefully if multi-update is invoked (see ZOOKEEPER-1495).
+ </description>
+ </property>
<!-- End of properties used to generate ZooKeeper host:port quorum list. -->
<!--