You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/01/09 03:44:27 UTC
[23/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master
branch (1.7.0-SNAPSHOT)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java b/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
index f32fb70..3c88f48 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
@@ -28,49 +28,49 @@ import org.apache.log4j.Logger;
/**
* This store removes Repos, in the store it wraps, that are in a finished or new state for more than a configurable time period.
- *
+ *
* No external time source is used. It starts tracking idle time when its created.
- *
+ *
*/
public class AgeOffStore<T> implements TStore<T> {
-
+
public interface TimeSource {
long currentTimeMillis();
}
-
+
final private static Logger log = Logger.getLogger(AgeOffStore.class);
-
+
private TStore<T> store;
private Map<Long,Long> candidates;
private long ageOffTime;
private long minTime;
private TimeSource timeSource;
-
+
private synchronized void updateMinTime() {
minTime = Long.MAX_VALUE;
-
+
for (Long time : candidates.values()) {
if (time < minTime)
minTime = time;
}
}
-
+
private synchronized void addCandidate(long txid) {
long time = timeSource.currentTimeMillis();
candidates.put(txid, time);
if (time < minTime)
minTime = time;
}
-
+
private synchronized void removeCandidate(long txid) {
Long time = candidates.remove(txid);
if (time != null && time <= minTime)
updateMinTime();
}
-
+
public void ageOff() {
HashSet<Long> oldTxs = new HashSet<Long>();
-
+
synchronized (this) {
long time = timeSource.currentTimeMillis();
if (minTime < time && time - minTime >= ageOffTime) {
@@ -79,12 +79,12 @@ public class AgeOffStore<T> implements TStore<T> {
oldTxs.add(entry.getKey());
}
}
-
+
candidates.keySet().removeAll(oldTxs);
updateMinTime();
}
}
-
+
for (Long txid : oldTxs) {
try {
store.reserve(txid);
@@ -99,7 +99,7 @@ public class AgeOffStore<T> implements TStore<T> {
default:
break;
}
-
+
} finally {
store.unreserve(txid, 0);
}
@@ -108,15 +108,15 @@ public class AgeOffStore<T> implements TStore<T> {
}
}
}
-
+
public AgeOffStore(TStore<T> store, long ageOffTime, TimeSource timeSource) {
this.store = store;
this.ageOffTime = ageOffTime;
this.timeSource = timeSource;
candidates = new HashMap<Long,Long>();
-
+
minTime = Long.MAX_VALUE;
-
+
List<Long> txids = store.list();
for (Long txid : txids) {
store.reserve(txid);
@@ -135,7 +135,7 @@ public class AgeOffStore<T> implements TStore<T> {
}
}
}
-
+
public AgeOffStore(TStore<T> store, long ageOffTime) {
this(store, ageOffTime, new TimeSource() {
@Override
@@ -144,53 +144,53 @@ public class AgeOffStore<T> implements TStore<T> {
}
});
}
-
+
@Override
public long create() {
long txid = store.create();
addCandidate(txid);
return txid;
}
-
+
@Override
public long reserve() {
return store.reserve();
}
-
+
@Override
public void reserve(long tid) {
store.reserve(tid);
}
-
+
@Override
public void unreserve(long tid, long deferTime) {
store.unreserve(tid, deferTime);
}
-
+
@Override
public Repo<T> top(long tid) {
return store.top(tid);
}
-
+
@Override
public void push(long tid, Repo<T> repo) throws StackOverflowException {
store.push(tid, repo);
}
-
+
@Override
public void pop(long tid) {
store.pop(tid);
}
-
+
@Override
public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) {
return store.getStatus(tid);
}
-
+
@Override
public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) {
store.setStatus(tid, status);
-
+
switch (status) {
case IN_PROGRESS:
case FAILED_IN_PROGRESS:
@@ -204,28 +204,28 @@ public class AgeOffStore<T> implements TStore<T> {
break;
}
}
-
+
@Override
public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus> expected) {
return store.waitForStatusChange(tid, expected);
}
-
+
@Override
public void setProperty(long tid, String prop, Serializable val) {
store.setProperty(tid, prop, val);
}
-
+
@Override
public Serializable getProperty(long tid, String prop) {
return store.getProperty(tid, prop);
}
-
+
@Override
public void delete(long tid) {
store.delete(tid);
removeCandidate(tid);
}
-
+
@Override
public List<Long> list() {
return store.list();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java
index 24d00d9..9ae7acb 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.fate;
/**
* Read only access to a repeatable persisted operation.
*
- * By definition, these methods are safe to call without impacting the state of FATE. They should also be
- * safe to call without impacting the state of system components.
+ * By definition, these methods are safe to call without impacting the state of FATE. They should also be safe to call without impacting the state of system
+ * components.
*
*/
public interface ReadOnlyRepo<T> {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java
index ad5e7e1..1f01090 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java
@@ -24,9 +24,9 @@ import com.google.common.base.Preconditions;
/**
* This store decorates a TStore to make sure it can not be modified.
- *
+ *
* Unlike relying directly on the ReadOnlyTStore interface, this class will not allow subsequent users to cast back to a mutable TStore successfully.
- *
+ *
*/
public class ReadOnlyStore<T> implements ReadOnlyTStore<T> {
@@ -58,7 +58,7 @@ public class ReadOnlyStore<T> implements ReadOnlyTStore<T> {
/**
* Decorates a Repo to make sure it is treated as a ReadOnlyRepo.
- *
+ *
* Similar to ReadOnlyStore, won't allow subsequent user to cast a ReadOnlyRepo back to a mutable Repo.
*/
protected static class ReadOnlyRepoWrapper<X> implements ReadOnlyRepo<X> {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java
index d390139..5c1344a 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java
@@ -23,8 +23,8 @@ import java.util.List;
/**
* Read only access to a Transaction Store.
*
- * A transaction consists of a number of operations. Instances of this class may check on the queue of outstanding
- * transactions but may neither modify them nor create new ones.
+ * A transaction consists of a number of operations. Instances of this class may check on the queue of outstanding transactions but may neither modify them nor
+ * create new ones.
*/
public interface ReadOnlyTStore<T> {
@@ -49,8 +49,7 @@ public interface ReadOnlyTStore<T> {
/**
* Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS.
*
- * Reserving a transaction id ensures that nothing else in-process interacting via the same instance
- * will be operating on that transaction id.
+ * Reserving a transaction id ensures that nothing else in-process interacting via the same instance will be operating on that transaction id.
*
* @return a transaction id that is safe to interact with, chosen by the store.
*/
@@ -59,8 +58,7 @@ public interface ReadOnlyTStore<T> {
/**
* Reserve the specific tid.
*
- * Reserving a transaction id ensures that nothing else in-process interacting via the same instance
- * will be operating on that transaction id.
+ * Reserving a transaction id ensures that nothing else in-process interacting via the same instance will be operating on that transaction id.
*
*/
void reserve(long tid);
@@ -70,18 +68,20 @@ public interface ReadOnlyTStore<T> {
*
* upon successful return the store now controls the referenced transaction id. caller should no longer interact with it.
*
- * @param tid transaction id, previously reserved.
- * @param deferTime time in millis to keep this transaction out of the pool used in the {@link #reserve() reserve} method. must be non-negative.
+ * @param tid
+ * transaction id, previously reserved.
+ * @param deferTime
+ * time in millis to keep this transaction out of the pool used in the {@link #reserve() reserve} method. must be non-negative.
*/
void unreserve(long tid, long deferTime);
-
/**
* Get the current operation for the given transaction id.
*
* Caller must have already reserved tid.
*
- * @param tid transaction id, previously reserved.
+ * @param tid
+ * transaction id, previously reserved.
* @return a read-only view of the operation
*/
ReadOnlyRepo<T> top(long tid);
@@ -91,7 +91,8 @@ public interface ReadOnlyTStore<T> {
*
* Caller must have already reserved tid.
*
- * @param tid transaction id, previously reserved.
+ * @param tid
+ * transaction id, previously reserved.
* @return execution status
*/
TStatus getStatus(long tid);
@@ -99,8 +100,10 @@ public interface ReadOnlyTStore<T> {
/**
* Wait for the satus of a transaction to change
*
- * @param tid transaction id, need not have been reserved.
- * @param expected a set of possible statuses we are interested in being notified about. may not be null.
+ * @param tid
+ * transaction id, need not have been reserved.
+ * @param expected
+ * a set of possible statuses we are interested in being notified about. may not be null.
* @return execution status.
*/
TStatus waitForStatusChange(long tid, EnumSet<TStatus> expected);
@@ -110,8 +113,10 @@ public interface ReadOnlyTStore<T> {
*
* Caller must have already reserved tid.
*
- * @param tid transaction id, previously reserved.
- * @param prop name of property to retrieve.
+ * @param tid
+ * transaction id, previously reserved.
+ * @param prop
+ * name of property to retrieve.
*/
Serializable getProperty(long tid, String prop);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/Repo.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/Repo.java b/fate/src/main/java/org/apache/accumulo/fate/Repo.java
index b0ebd1a..0dcfd7f 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/Repo.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/Repo.java
@@ -20,14 +20,14 @@ import java.io.Serializable;
/**
* Repeatable persisted operation
- *
+ *
*/
public interface Repo<T> extends ReadOnlyRepo<T>, Serializable {
-
+
Repo<T> call(long tid, T environment) throws Exception;
-
+
void undo(long tid, T environment) throws Exception;
-
+
// this allows the last fate op to return something to the user
String getReturn();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java b/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java
index 6e38f1b..0f385d4 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java
@@ -17,11 +17,11 @@
package org.apache.accumulo.fate;
public class StackOverflowException extends Exception {
-
+
public StackOverflowException(String msg) {
super(msg);
}
-
+
private static final long serialVersionUID = 1L;
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/TStore.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/TStore.java b/fate/src/main/java/org/apache/accumulo/fate/TStore.java
index 3adb493..bdcfba3 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/TStore.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/TStore.java
@@ -20,7 +20,7 @@ import java.io.Serializable;
/**
* Transaction Store: a place to save transactions
- *
+ *
* A transaction consists of a number of operations. To use, first create a transaction id, and then seed the transaction with an initial operation. An executor
* service can then execute the transaction's operation, possibly pushing more operations onto the transaction as each step successfully completes. If a step
* fails, the stack can be unwound, undoing each operation.
@@ -29,14 +29,14 @@ public interface TStore<T> extends ReadOnlyTStore<T> {
/**
* Create a new transaction id
- *
+ *
* @return a transaction id
*/
long create();
/**
* Get the current operation for the given transaction id.
- *
+ *
* @param tid
* transaction id
* @return the operation
@@ -46,7 +46,7 @@ public interface TStore<T> extends ReadOnlyTStore<T> {
/**
* Update the given transaction with the next operation
- *
+ *
* @param tid
* the transaction id
* @param repo
@@ -61,7 +61,7 @@ public interface TStore<T> extends ReadOnlyTStore<T> {
/**
* Update the state of a given transaction
- *
+ *
* @param tid
* transaction id
* @param status
@@ -73,7 +73,7 @@ public interface TStore<T> extends ReadOnlyTStore<T> {
/**
* Remove the transaction from the store.
- *
+ *
* @param tid
* the transaction id
*/
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java b/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
index 0dc156e..6c8a7d6 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
@@ -46,7 +46,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
//TODO document zookeeper layout - ACCUMULO-1298
public class ZooStore<T> implements TStore<T> {
-
+
private String path;
private IZooReaderWriter zk;
private String lastReserved = "";
@@ -55,21 +55,21 @@ public class ZooStore<T> implements TStore<T> {
private SecureRandom idgenerator;
private long statusChangeEvents = 0;
private int reservationsWaiting = 0;
-
+
private byte[] serialize(Object o) {
-
+
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
-
+
return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
-
+
private Object deserialize(byte ser[]) {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(ser);
@@ -79,26 +79,26 @@ public class ZooStore<T> implements TStore<T> {
throw new RuntimeException(e);
}
}
-
+
private String getTXPath(long tid) {
return String.format("%s/tx_%016x", path, tid);
}
-
+
private long parseTid(String txdir) {
return Long.parseLong(txdir.split("_")[1], 16);
}
-
+
public ZooStore(String path, IZooReaderWriter zk) throws KeeperException, InterruptedException {
-
+
this.path = path;
this.zk = zk;
this.reserved = new HashSet<Long>();
this.defered = new HashMap<Long,Long>();
this.idgenerator = new SecureRandom();
-
+
zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP);
}
-
+
@Override
public long create() {
while (true) {
@@ -114,28 +114,28 @@ public class ZooStore<T> implements TStore<T> {
}
}
}
-
+
@Override
public long reserve() {
try {
while (true) {
-
+
long events;
synchronized (this) {
events = statusChangeEvents;
}
-
+
List<String> txdirs = new ArrayList<String>(zk.getChildren(path));
Collections.sort(txdirs);
-
+
synchronized (this) {
if (txdirs.size() > 0 && txdirs.get(txdirs.size() - 1).compareTo(lastReserved) <= 0)
lastReserved = "";
}
-
+
for (String txdir : txdirs) {
long tid = parseTid(txdir);
-
+
synchronized (this) {
// this check makes reserve pick up where it left off, so that it cycles through all as it is repeatedly called.... failing to do so can lead to
// starvation where fate ops that sort higher and hold a lock are never reserved.
@@ -151,13 +151,12 @@ public class ZooStore<T> implements TStore<T> {
if (!reserved.contains(tid)) {
reserved.add(tid);
lastReserved = txdir;
- }
- else
+ } else
continue;
}
-
+
// have reserved id, status should not change
-
+
try {
TStatus status = TStatus.valueOf(new String(zk.getData(path + "/" + txdir, null), UTF_8));
if (status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS) {
@@ -173,7 +172,7 @@ public class ZooStore<T> implements TStore<T> {
throw e;
}
}
-
+
synchronized (this) {
if (events == statusChangeEvents) {
if (defered.size() > 0) {
@@ -190,7 +189,7 @@ public class ZooStore<T> implements TStore<T> {
throw new RuntimeException(e);
}
}
-
+
public void reserve(long tid) {
synchronized (this) {
reservationsWaiting++;
@@ -201,63 +200,63 @@ public class ZooStore<T> implements TStore<T> {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
-
+
reserved.add(tid);
} finally {
reservationsWaiting--;
}
}
}
-
+
private void unreserve(long tid) {
synchronized (this) {
if (!reserved.remove(tid))
throw new IllegalStateException("Tried to unreserve id that was not reserved " + String.format("%016x", tid));
-
+
// do not want this unreserve to unesc wake up threads in reserve()... this leads to infinite loop when tx is stuck in NEW...
// only do this when something external has called reserve(tid)...
if (reservationsWaiting > 0)
this.notifyAll();
}
}
-
+
@Override
public void unreserve(long tid, long deferTime) {
-
+
if (deferTime < 0)
throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
-
+
synchronized (this) {
if (!reserved.remove(tid))
throw new IllegalStateException("Tried to unreserve id that was not reserved " + String.format("%016x", tid));
-
+
if (deferTime > 0)
defered.put(tid, System.currentTimeMillis() + deferTime);
-
+
this.notifyAll();
}
-
+
}
-
+
private void verifyReserved(long tid) {
synchronized (this) {
if (!reserved.contains(tid))
throw new IllegalStateException("Tried to operate on unreserved transaction " + String.format("%016x", tid));
}
}
-
+
@SuppressWarnings("unchecked")
@Override
public Repo<T> top(long tid) {
verifyReserved(tid);
-
+
while (true) {
try {
String txpath = getTXPath(tid);
String top = findTop(txpath);
if (top == null)
return null;
-
+
byte[] ser = zk.getData(txpath + "/" + top, null);
return (Repo<T>) deserialize(ser);
} catch (KeeperException.NoNodeException ex) {
@@ -267,35 +266,35 @@ public class ZooStore<T> implements TStore<T> {
}
}
}
-
+
private String findTop(String txpath) throws KeeperException, InterruptedException {
List<String> ops = zk.getChildren(txpath);
-
+
ops = new ArrayList<String>(ops);
-
+
String max = "";
-
+
for (String child : ops)
if (child.startsWith("repo_") && child.compareTo(max) > 0)
max = child;
-
+
if (max.equals(""))
return null;
-
+
return max;
}
-
+
@Override
public void push(long tid, Repo<T> repo) throws StackOverflowException {
verifyReserved(tid);
-
+
String txpath = getTXPath(tid);
try {
String top = findTop(txpath);
if (top != null && Long.parseLong(top.split("_")[1]) > 100) {
throw new StackOverflowException("Repo stack size too large");
}
-
+
zk.putPersistentSequential(txpath + "/repo_", serialize(repo));
} catch (StackOverflowException soe) {
throw soe;
@@ -303,11 +302,11 @@ public class ZooStore<T> implements TStore<T> {
throw new RuntimeException(e);
}
}
-
+
@Override
public void pop(long tid) {
verifyReserved(tid);
-
+
try {
String txpath = getTXPath(tid);
String top = findTop(txpath);
@@ -318,7 +317,7 @@ public class ZooStore<T> implements TStore<T> {
throw new RuntimeException(e);
}
}
-
+
private TStatus _getStatus(long tid) {
try {
return TStatus.valueOf(new String(zk.getData(getTXPath(tid), null), UTF_8));
@@ -328,13 +327,13 @@ public class ZooStore<T> implements TStore<T> {
throw new RuntimeException(e);
}
}
-
+
@Override
public TStatus getStatus(long tid) {
verifyReserved(tid);
return _getStatus(tid);
}
-
+
@Override
public TStatus waitForStatusChange(long tid, EnumSet<TStatus> expected) {
while (true) {
@@ -342,11 +341,11 @@ public class ZooStore<T> implements TStore<T> {
synchronized (this) {
events = statusChangeEvents;
}
-
+
TStatus status = _getStatus(tid);
if (expected.contains(status))
return status;
-
+
synchronized (this) {
if (events == statusChangeEvents) {
try {
@@ -358,38 +357,38 @@ public class ZooStore<T> implements TStore<T> {
}
}
}
-
+
@Override
public void setStatus(long tid, TStatus status) {
verifyReserved(tid);
-
+
try {
zk.putPersistentData(getTXPath(tid), status.name().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
synchronized (this) {
statusChangeEvents++;
}
-
+
}
-
+
@Override
public void delete(long tid) {
verifyReserved(tid);
-
+
try {
zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
-
+
@Override
public void setProperty(long tid, String prop, Serializable so) {
verifyReserved(tid);
-
+
try {
if (so instanceof String) {
zk.putPersistentData(getTXPath(tid) + "/prop_" + prop, ("S " + so).getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
@@ -405,14 +404,14 @@ public class ZooStore<T> implements TStore<T> {
throw new RuntimeException(e2);
}
}
-
+
@Override
public Serializable getProperty(long tid, String prop) {
verifyReserved(tid);
-
+
try {
byte[] data = zk.getData(getTXPath(tid) + "/prop_" + prop, null);
-
+
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
@@ -428,7 +427,7 @@ public class ZooStore<T> implements TStore<T> {
throw new RuntimeException(e);
}
}
-
+
@Override
public List<Long> list() {
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java b/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java
index 401bc1a..33e84cf 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java
@@ -29,7 +29,7 @@ public class AddressUtil {
/**
* Fetch the security value that determines how long DNS failures are cached. Looks up the security property 'networkaddress.cache.negative.ttl'. Should that
* fail returns the default value used in the Oracle JVM 1.4+, which is 10 seconds.
- *
+ *
* @param originalException
* the host lookup that is the source of needing this lookup. maybe be null.
* @return positive integer number of seconds
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java b/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java
index 95fc6d3..da7c41c 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java
@@ -17,44 +17,44 @@
package org.apache.accumulo.fate.util;
public class Daemon extends Thread {
-
+
public Daemon() {
setDaemon(true);
}
-
+
public Daemon(Runnable target) {
super(target);
setDaemon(true);
}
-
+
public Daemon(String name) {
super(name);
setDaemon(true);
}
-
+
public Daemon(ThreadGroup group, Runnable target) {
super(group, target);
setDaemon(true);
}
-
+
public Daemon(ThreadGroup group, String name) {
super(group, name);
setDaemon(true);
}
-
+
public Daemon(Runnable target, String name) {
super(target, name);
setDaemon(true);
}
-
+
public Daemon(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
setDaemon(true);
}
-
+
public Daemon(ThreadGroup group, Runnable target, String name, long stackSize) {
super(group, target, name, stackSize);
setDaemon(true);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java b/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java
index 32938e0..ba45488 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java
@@ -23,12 +23,12 @@ import org.apache.log4j.Logger;
public class LoggingRunnable implements Runnable {
private Runnable runnable;
private Logger log;
-
+
public LoggingRunnable(Logger log, Runnable r) {
this.runnable = r;
this.log = log;
}
-
+
public void run() {
try {
runnable.run();
@@ -39,7 +39,7 @@ public class LoggingRunnable implements Runnable {
// maybe the logging system is screwed up OR there is a bug in the exception, like t.getMessage() throws a NPE
System.err.println("ERROR " + new Date() + " Failed to log message about thread death " + t2.getMessage());
t2.printStackTrace();
-
+
// try to print original exception
System.err.println("ERROR " + new Date() + " Exception that failed to log : " + t.getMessage());
t.printStackTrace();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java b/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java
index 32cdb5b..fb5701a 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java
@@ -20,7 +20,7 @@ import org.apache.log4j.Logger;
public class UtilWaitThread {
private static final Logger log = Logger.getLogger(UtilWaitThread.class);
-
+
public static void sleep(long millis) {
try {
Thread.sleep(millis);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
index d10fcd2..fc94ff4 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
@@ -33,22 +33,22 @@ import org.apache.log4j.Logger;
// A ReadWriteLock that can be implemented in ZooKeeper. Features the ability to store data
// with the lock, and recover the lock using that data to find the lock.
public class DistributedReadWriteLock implements java.util.concurrent.locks.ReadWriteLock {
-
+
static enum LockType {
READ, WRITE,
};
-
+
// serializer for lock type and user data
static class ParsedLock {
public ParsedLock(LockType type, byte[] userData) {
this.type = type;
this.userData = Arrays.copyOf(userData, userData.length);
}
-
+
public ParsedLock(byte[] lockData) {
if (lockData == null || lockData.length < 1)
throw new IllegalArgumentException();
-
+
int split = -1;
for (int i = 0; i < lockData.length; i++) {
if (lockData[i] == ':') {
@@ -56,22 +56,22 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
break;
}
}
-
+
if (split == -1)
throw new IllegalArgumentException();
-
+
this.type = LockType.valueOf(new String(lockData, 0, split, UTF_8));
this.userData = Arrays.copyOfRange(lockData, split + 1, lockData.length);
}
-
+
public LockType getType() {
return type;
}
-
+
public byte[] getUserData() {
return userData;
}
-
+
public byte[] getLockData() {
byte typeBytes[] = type.name().getBytes(UTF_8);
byte[] result = new byte[userData.length + 1 + typeBytes.length];
@@ -80,46 +80,46 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
System.arraycopy(userData, 0, result, typeBytes.length + 1, userData.length);
return result;
}
-
+
private LockType type;
private byte[] userData;
}
-
+
// This kind of lock can be easily implemented by ZooKeeper
// You make an entry at the bottom of the queue, readers run when there are no writers ahead of them,
// a writer only runs when they are at the top of the queue.
public interface QueueLock {
SortedMap<Long,byte[]> getEarlierEntries(long entry);
-
+
void removeEntry(long entry);
-
+
long addEntry(byte[] data);
}
-
+
private static final Logger log = Logger.getLogger(DistributedReadWriteLock.class);
-
+
static class ReadLock implements Lock {
-
+
QueueLock qlock;
byte[] userData;
long entry = -1;
-
+
ReadLock(QueueLock qlock, byte[] userData) {
this.qlock = qlock;
this.userData = userData;
}
-
+
// for recovery
ReadLock(QueueLock qlock, byte[] userData, long entry) {
this.qlock = qlock;
this.userData = userData;
this.entry = entry;
}
-
+
protected LockType lockType() {
return LockType.READ;
}
-
+
@Override
public void lock() {
while (true) {
@@ -131,7 +131,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
}
}
}
-
+
@Override
public void lockInterruptibly() throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
@@ -139,7 +139,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
return;
}
}
-
+
@Override
public boolean tryLock() {
if (entry == -1) {
@@ -157,7 +157,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData, UTF_8) + " lockType "
+ lockType());
}
-
+
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
@@ -171,7 +171,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
}
return false;
}
-
+
@Override
public void unlock() {
if (entry == -1)
@@ -180,28 +180,28 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
qlock.removeEntry(entry);
entry = -1;
}
-
+
@Override
public Condition newCondition() {
throw new NotImplementedException();
}
}
-
+
static class WriteLock extends ReadLock {
-
+
WriteLock(QueueLock qlock, byte[] userData) {
super(qlock, userData);
}
-
+
WriteLock(QueueLock qlock, byte[] userData, long entry) {
super(qlock, userData, entry);
}
-
+
@Override
protected LockType lockType() {
return LockType.WRITE;
}
-
+
@Override
public boolean tryLock() {
if (entry == -1) {
@@ -211,22 +211,22 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext())
- throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData, UTF_8) + " lockType "
- + lockType());
+ throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData, UTF_8)
+ + " lockType " + lockType());
if (iterator.next().getKey().equals(entry))
return true;
return false;
}
}
-
+
private QueueLock qlock;
private byte[] data;
-
+
public DistributedReadWriteLock(QueueLock qlock, byte[] data) {
this.qlock = qlock;
this.data = Arrays.copyOf(data, data.length);
}
-
+
static public Lock recoverLock(QueueLock qlock, byte[] data) {
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
@@ -242,12 +242,12 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read
}
return null;
}
-
+
@Override
public Lock readLock() {
return new ReadLock(qlock, data);
}
-
+
@Override
public Lock writeLock() {
return new WriteLock(qlock, data);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
index ad2a191..14d104b 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
@@ -40,7 +40,7 @@ public interface IZooReaderWriter extends IZooReader {
boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
-
+
boolean putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls) throws KeeperException, InterruptedException;
String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
index cc4bebf..bd27fb9 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
@@ -24,20 +24,21 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
public class TransactionWatcher {
-
+
public interface Arbitrator {
boolean transactionAlive(String type, long tid) throws Exception;
+
boolean transactionComplete(String type, long tid) throws Exception;
}
-
+
private static final Logger log = Logger.getLogger(TransactionWatcher.class);
final private Map<Long,AtomicInteger> counts = new HashMap<Long,AtomicInteger>();
final private Arbitrator arbitrator;
-
+
public TransactionWatcher(Arbitrator arbitrator) {
this.arbitrator = arbitrator;
}
-
+
public <T> T run(String ztxBulk, long tid, Callable<T> callable) throws Exception {
synchronized (counts) {
if (!arbitrator.transactionAlive(ztxBulk, tid)) {
@@ -62,7 +63,7 @@ public class TransactionWatcher {
}
}
}
-
+
public boolean isActive(long tid) {
synchronized (counts) {
log.debug("Transactions in progress " + counts);
@@ -70,5 +71,5 @@ public class TransactionWatcher {
return count != null && count.get() > 0;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index ee53ddd..5cfdbb8 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@ -40,8 +40,7 @@ import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
/**
- * A cache for values stored in ZooKeeper. Values are kept up to date as they
- * change.
+ * A cache for values stored in ZooKeeper. Values are kept up to date as they change.
*/
public class ZooCache {
private static final Logger log = Logger.getLogger(ZooCache.class);
@@ -104,31 +103,36 @@ public class ZooCache {
/**
* Creates a new cache.
*
- * @param zooKeepers comma-separated list of ZooKeeper host[:port]s
- * @param sessionTimeout ZooKeeper session timeout
+ * @param zooKeepers
+ * comma-separated list of ZooKeeper host[:port]s
+ * @param sessionTimeout
+ * ZooKeeper session timeout
*/
public ZooCache(String zooKeepers, int sessionTimeout) {
this(zooKeepers, sessionTimeout, null);
}
/**
- * Creates a new cache. The given watcher is called whenever a watched node
- * changes.
+ * Creates a new cache. The given watcher is called whenever a watched node changes.
*
- * @param zooKeepers comma-separated list of ZooKeeper host[:port]s
- * @param sessionTimeout ZooKeeper session timeout
- * @param watcher watcher object
+ * @param zooKeepers
+ * comma-separated list of ZooKeeper host[:port]s
+ * @param sessionTimeout
+ * ZooKeeper session timeout
+ * @param watcher
+ * watcher object
*/
public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
this(new ZooReader(zooKeepers, sessionTimeout), watcher);
}
/**
- * Creates a new cache. The given watcher is called whenever a watched node
- * changes.
+ * Creates a new cache. The given watcher is called whenever a watched node changes.
*
- * @param reader ZooKeeper reader
- * @param watcher watcher object
+ * @param reader
+ * ZooKeeper reader
+ * @param watcher
+ * watcher object
*/
public ZooCache(ZooReader reader, Watcher watcher) {
this.zReader = reader;
@@ -187,7 +191,8 @@ public class ZooCache {
/**
* Gets the children of the given node. A watch is established by this call.
*
- * @param zPath path of node
+ * @param zPath
+ * path of node
* @return children list, or null if node has no children or does not exist
*/
public synchronized List<String> getChildren(final String zPath) {
@@ -222,10 +227,10 @@ public class ZooCache {
}
/**
- * Gets data at the given path. Status information is not returned. A watch is
- * established by this call.
+ * Gets data at the given path. Status information is not returned. A watch is established by this call.
*
- * @param zPath path to get
+ * @param zPath
+ * path to get
* @return path data, or null if non-existent
*/
public synchronized byte[] get(final String zPath) {
@@ -233,11 +238,12 @@ public class ZooCache {
}
/**
- * Gets data at the given path, filling status information into the given
- * <code>Stat</code> object. A watch is established by this call.
+ * Gets data at the given path, filling status information into the given <code>Stat</code> object. A watch is established by this call.
*
- * @param zPath path to get
- * @param stat status object to populate
+ * @param zPath
+ * path to get
+ * @param stat
+ * status object to populate
* @return path data, or null if non-existent
*/
public synchronized byte[] get(final String zPath, Stat stat) {
@@ -332,17 +338,20 @@ public class ZooCache {
/**
* Checks if a data value (or lack of one) is cached.
*
- * @param zPath path of node
+ * @param zPath
+ * path of node
* @return true if data value is cached
*/
@VisibleForTesting
synchronized boolean dataCached(String zPath) {
return cache.containsKey(zPath);
}
+
/**
* Checks if children of a node (or lack of them) are cached.
*
- * @param zPath path of node
+ * @param zPath
+ * path of node
* @return true if children are cached
*/
@VisibleForTesting
@@ -353,7 +362,8 @@ public class ZooCache {
/**
* Clears this cache of all information about nodes rooted at the given path.
*
- * @param zPath path of top node
+ * @param zPath
+ * path of top node
*/
public synchronized void clear(String zPath) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java
index 3c59a00..1475928 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.fate.zookeeper;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.zookeeper.Watcher;
/**
@@ -47,15 +48,17 @@ public class ZooCacheFactory {
return zc;
}
}
+
/**
- * Gets a watched {@link ZooCache}. If the watcher is null, then the same (unwatched)
- * object may be returned for multiple calls with the same remaining arguments.
+ * Gets a watched {@link ZooCache}. If the watcher is null, then the same (unwatched) object may be returned for multiple calls with the same remaining
+ * arguments.
*
* @param zooKeepers
* comma-seprated list of ZooKeeper host[:port]s
* @param sessionTimeout
* session timeout
- * @param watcher watcher (optional)
+ * @param watcher
+ * watcher (optional)
* @return cache object
*/
public ZooCache getZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
index 1391d98..a0100b2 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
@@ -35,28 +35,28 @@ import org.apache.zookeeper.data.Stat;
public class ZooLock implements Watcher {
private static final Logger log = Logger.getLogger(ZooLock.class);
-
+
public static final String LOCK_PREFIX = "zlock-";
-
+
public enum LockLossReason {
LOCK_DELETED, SESSION_EXPIRED
}
-
+
public interface LockWatcher {
void lostLock(LockLossReason reason);
-
+
/**
* lost the ability to monitor the lock node, and its status is unknown
*/
void unableToMonitorLockNode(Throwable e);
}
-
+
public interface AsyncLockWatcher extends LockWatcher {
void acquiredLock();
-
+
void failedToAcquireLock(Exception e);
}
-
+
private boolean lockWasAcquired;
final private String path;
protected final IZooReaderWriter zooKeeper;
@@ -64,11 +64,11 @@ public class ZooLock implements Watcher {
private LockWatcher lockWatcher;
private boolean watchingParent = false;
private String asyncLock;
-
+
public ZooLock(String zookeepers, int timeInMillis, String scheme, byte[] auth, String path) {
this(new ZooCacheFactory().getZooCache(zookeepers, timeInMillis), ZooReaderWriter.getInstance(zookeepers, timeInMillis, scheme, auth), path);
}
-
+
protected ZooLock(ZooCache zc, IZooReaderWriter zrw, String path) {
getLockDataZooCache = zc;
this.path = path;
@@ -81,66 +81,66 @@ public class ZooLock implements Watcher {
throw new RuntimeException(ex);
}
}
-
+
private static class TryLockAsyncLockWatcher implements AsyncLockWatcher {
-
+
boolean acquiredLock = false;
LockWatcher lw;
-
+
public TryLockAsyncLockWatcher(LockWatcher lw2) {
this.lw = lw2;
}
-
+
@Override
public void acquiredLock() {
acquiredLock = true;
}
-
+
@Override
public void failedToAcquireLock(Exception e) {}
-
+
@Override
public void lostLock(LockLossReason reason) {
lw.lostLock(reason);
}
-
+
@Override
public void unableToMonitorLockNode(Throwable e) {
lw.unableToMonitorLockNode(e);
}
-
+
}
-
+
public synchronized boolean tryLock(LockWatcher lw, byte data[]) throws KeeperException, InterruptedException {
-
+
TryLockAsyncLockWatcher tlalw = new TryLockAsyncLockWatcher(lw);
-
+
lockAsync(tlalw, data);
-
+
if (tlalw.acquiredLock) {
return true;
}
-
+
if (asyncLock != null) {
zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
asyncLock = null;
}
-
+
return false;
}
-
+
private synchronized void lockAsync(final String myLock, final AsyncLockWatcher lw) throws KeeperException, InterruptedException {
-
+
if (asyncLock == null) {
throw new IllegalStateException("Called lockAsync() when asyncLock == null");
}
-
+
List<String> children = zooKeeper.getChildren(path);
-
+
if (!children.contains(myLock)) {
throw new RuntimeException("Lock attempt ephemeral node no longer exist " + myLock);
}
-
+
Collections.sort(children);
if (log.isTraceEnabled()) {
log.trace("Candidate lock nodes");
@@ -148,7 +148,7 @@ public class ZooLock implements Watcher {
log.trace("- " + child);
}
}
-
+
if (children.get(0).equals(myLock)) {
log.trace("First candidate is my lock, acquiring");
if (!watchingParent) {
@@ -166,14 +166,14 @@ public class ZooLock implements Watcher {
if (child.equals(myLock)) {
break;
}
-
+
prev = child;
}
-
+
final String lockToWatch = path + "/" + prev;
log.trace("Establishing watch on " + lockToWatch);
Stat stat = zooKeeper.getStatus(lockToWatch, new Watcher() {
-
+
@Override
public void process(WatchedEvent event) {
if (log.isTraceEnabled()) {
@@ -224,39 +224,39 @@ public class ZooLock implements Watcher {
}
}
}
-
+
});
-
+
if (stat == null)
lockAsync(myLock, lw);
}
-
+
private void lostLock(LockLossReason reason) {
LockWatcher localLw = lockWatcher;
lock = null;
lockWatcher = null;
-
+
localLw.lostLock(reason);
}
public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) {
-
+
if (lockWatcher != null || lock != null || asyncLock != null) {
throw new IllegalStateException();
}
-
+
lockWasAcquired = false;
-
+
try {
final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
log.trace("Ephemeral node " + asyncLockPath + " created");
Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
-
- private void failedToAcquireLock(){
+
+ private void failedToAcquireLock() {
lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
asyncLock = null;
}
-
+
public void process(WatchedEvent event) {
synchronized (ZooLock.this) {
if (lock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + lock)) {
@@ -264,13 +264,13 @@ public class ZooLock implements Watcher {
} else if (asyncLock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + asyncLock)) {
failedToAcquireLock();
} else if (event.getState() != KeeperState.Disconnected && event.getState() != KeeperState.Expired && (lock != null || asyncLock != null)) {
- log.debug("Unexpected event watching lock node "+event+" "+asyncLockPath);
+ log.debug("Unexpected event watching lock node " + event + " " + asyncLockPath);
try {
Stat stat2 = zooKeeper.getStatus(asyncLockPath, this);
- if(stat2 == null){
- if(lock != null)
+ if (stat2 == null) {
+ if (lock != null)
lostLock(LockLossReason.LOCK_DELETED);
- else if(asyncLock != null)
+ else if (asyncLock != null)
failedToAcquireLock();
}
} catch (Throwable e) {
@@ -278,105 +278,105 @@ public class ZooLock implements Watcher {
log.error("Failed to stat lock node " + asyncLockPath, e);
}
}
-
+
}
}
});
-
+
if (stat == null) {
lw.failedToAcquireLock(new Exception("Lock does not exist after create"));
return;
}
-
+
asyncLock = asyncLockPath.substring(path.length() + 1);
-
+
lockAsync(asyncLock, lw);
-
+
} catch (KeeperException e) {
lw.failedToAcquireLock(e);
} catch (InterruptedException e) {
lw.failedToAcquireLock(e);
}
}
-
+
public synchronized boolean tryToCancelAsyncLockOrUnlock() throws InterruptedException, KeeperException {
boolean del = false;
-
+
if (asyncLock != null) {
zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
del = true;
}
-
+
if (lock != null) {
unlock();
del = true;
}
-
+
return del;
}
-
+
public synchronized void unlock() throws InterruptedException, KeeperException {
if (lock == null) {
throw new IllegalStateException();
}
-
+
LockWatcher localLw = lockWatcher;
String localLock = lock;
-
+
lock = null;
lockWatcher = null;
-
+
zooKeeper.recursiveDelete(path + "/" + localLock, NodeMissingPolicy.SKIP);
-
+
localLw.lostLock(LockLossReason.LOCK_DELETED);
}
-
+
public synchronized String getLockPath() {
if (lock == null) {
return null;
}
return path + "/" + lock;
}
-
+
public synchronized String getLockName() {
return lock;
}
-
+
public synchronized LockID getLockID() {
if (lock == null) {
throw new IllegalStateException("Lock not held");
}
return new LockID(path, lock, zooKeeper.getZooKeeper().getSessionId());
}
-
+
/**
* indicates if the lock was acquired in the past.... helps discriminate between the case where the lock was never held, or held and lost....
- *
+ *
* @return true if the lock was aquired, otherwise false.
*/
public synchronized boolean wasLockAcquired() {
return lockWasAcquired;
}
-
+
public synchronized boolean isLocked() {
return lock != null;
}
-
+
public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException {
- if (getLockPath()!=null)
+ if (getLockPath() != null)
zooKeeper.getZooKeeper().setData(getLockPath(), b, -1);
}
-
+
@Override
public synchronized void process(WatchedEvent event) {
log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
-
+
watchingParent = false;
if (event.getState() == KeeperState.Expired && lock != null) {
lostLock(LockLossReason.SESSION_EXPIRED);
} else {
-
+
try { // set the watch on the parent node again
zooKeeper.getStatus(path, this);
watchingParent = true;
@@ -389,151 +389,151 @@ public class ZooLock implements Watcher {
log.error("Error resetting watch on ZooLock " + lock == null ? asyncLock : lock + " " + event, ex);
}
}
-
+
}
}
-
+
public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
-
+
List<String> children = zk.getChildren(lid.path, false);
-
+
if (children == null || children.size() == 0) {
return false;
}
-
+
Collections.sort(children);
-
+
String lockNode = children.get(0);
if (!lid.node.equals(lockNode))
return false;
-
+
Stat stat = zk.exists(lid.path + "/" + lid.node, false);
return stat != null && stat.getEphemeralOwner() == lid.eid;
}
-
+
public static boolean isLockHeld(ZooCache zc, LockID lid) {
-
+
List<String> children = zc.getChildren(lid.path);
-
+
if (children == null || children.size() == 0) {
return false;
}
-
+
children = new ArrayList<String>(children);
Collections.sort(children);
-
+
String lockNode = children.get(0);
if (!lid.node.equals(lockNode))
return false;
-
+
Stat stat = new Stat();
return zc.get(lid.path + "/" + lid.node, stat) != null && stat.getEphemeralOwner() == lid.eid;
}
-
+
public static byte[] getLockData(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
List<String> children = zk.getChildren(path, false);
-
+
if (children == null || children.size() == 0) {
return null;
}
-
+
Collections.sort(children);
-
+
String lockNode = children.get(0);
-
+
return zk.getData(path + "/" + lockNode, false, null);
}
-
+
public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path, Stat stat) {
-
+
List<String> children = zc.getChildren(path);
-
+
if (children == null || children.size() == 0) {
return null;
}
-
+
children = new ArrayList<String>(children);
Collections.sort(children);
-
+
String lockNode = children.get(0);
-
+
if (!lockNode.startsWith(LOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
-
+
return zc.get(path + "/" + lockNode, stat);
}
-
+
public static long getSessionId(ZooCache zc, String path) throws KeeperException, InterruptedException {
List<String> children = zc.getChildren(path);
-
+
if (children == null || children.size() == 0) {
return 0;
}
-
+
children = new ArrayList<String>(children);
Collections.sort(children);
-
+
String lockNode = children.get(0);
-
+
Stat stat = new Stat();
if (zc.get(path + "/" + lockNode, stat) != null)
return stat.getEphemeralOwner();
return 0;
}
-
+
private static ZooCache getLockDataZooCache;
-
+
public long getSessionId() throws KeeperException, InterruptedException {
return getSessionId(getLockDataZooCache, path);
}
-
+
public static void deleteLock(IZooReaderWriter zk, String path) throws InterruptedException, KeeperException {
List<String> children;
-
+
children = zk.getChildren(path);
-
+
if (children == null || children.size() == 0) {
throw new IllegalStateException("No lock is held at " + path);
}
-
+
Collections.sort(children);
-
+
String lockNode = children.get(0);
-
+
if (!lockNode.startsWith(LOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
-
+
zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.SKIP);
-
+
}
-
+
public static boolean deleteLock(IZooReaderWriter zk, String path, String lockData) throws InterruptedException, KeeperException {
List<String> children;
-
+
children = zk.getChildren(path);
-
+
if (children == null || children.size() == 0) {
throw new IllegalStateException("No lock is held at " + path);
}
-
+
Collections.sort(children);
-
+
String lockNode = children.get(0);
-
+
if (!lockNode.startsWith(LOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
-
+
byte[] data = zk.getData(path + "/" + lockNode, null);
-
+
if (lockData.equals(new String(data, UTF_8))) {
zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.FAIL);
return true;
}
-
+
return false;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
index 38c7b64..2786c4f 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
@@ -209,5 +209,4 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP);
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
index f1ba428..c5a0ce3 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
@@ -27,11 +27,11 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
public class ZooReservation {
-
+
public static boolean attempt(IZooReaderWriter zk, String path, String reservationID, String debugInfo) throws KeeperException, InterruptedException {
if (reservationID.contains(":"))
throw new IllegalArgumentException();
-
+
while (true) {
try {
zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(UTF_8), NodeExistsPolicy.FAIL);
@@ -44,18 +44,18 @@ public class ZooReservation {
} catch (NoNodeException nne) {
continue;
}
-
+
String idInZoo = new String(zooData, UTF_8).split(":")[0];
-
+
return idInZoo.equals(reservationID);
}
}
-
+
}
-
+
public static void release(IZooReaderWriter zk, String path, String reservationID) throws KeeperException, InterruptedException {
byte[] zooData;
-
+
try {
zooData = zk.getData(path, null);
} catch (NoNodeException e) {
@@ -63,15 +63,15 @@ public class ZooReservation {
Logger.getLogger(ZooReservation.class).debug("Node does not exist " + path);
return;
}
-
+
String zooDataStr = new String(zooData, UTF_8);
String idInZoo = zooDataStr.split(":")[0];
-
+
if (!idInZoo.equals(reservationID)) {
throw new IllegalStateException("Tried to release reservation " + path + " with data mismatch " + reservationID + " " + zooDataStr);
}
-
+
zk.recursiveDelete(path, NodeMissingPolicy.SKIP);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
index 0059af7..6b5ec43 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
@@ -68,11 +68,16 @@ public class ZooSession {
}
/**
- * @param host comma separated list of zk servers
- * @param timeout in milliseconds
- * @param scheme authentication type, e.g. 'digest', may be null
- * @param auth authentication-scheme-specific token, may be null
- * @param watcher ZK notifications, may be null
+ * @param host
+ * comma separated list of zk servers
+ * @param timeout
+ * in milliseconds
+ * @param scheme
+ * authentication type, e.g. 'digest', may be null
+ * @param auth
+ * authentication-scheme-specific token, may be null
+ * @param watcher
+ * ZK notifications, may be null
*/
public static ZooKeeper connect(String host, int timeout, String scheme, byte[] auth, Watcher watcher) {
final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
@@ -99,7 +104,7 @@ public class ZooSession {
} catch (IOException e) {
if (e instanceof UnknownHostException) {
/*
- Make sure we wait atleast as long as the JVM TTL for negative DNS responses
+ * Make sure we wait atleast as long as the JVM TTL for negative DNS responses
*/
sleepTime = Math.max(sleepTime, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) e) + 1) * 1000);
}
@@ -121,14 +126,13 @@ public class ZooSession {
if (tryAgain) {
if (startTime + 2 * timeout < System.currentTimeMillis() + sleepTime + connectTimeWait)
sleepTime = startTime + 2 * timeout - System.currentTimeMillis() - connectTimeWait;
- if (sleepTime < 0)
- {
+ if (sleepTime < 0) {
connectTimeWait -= sleepTime;
sleepTime = 0;
}
UtilWaitThread.sleep(sleepTime);
if (sleepTime < 10000)
- sleepTime = sleepTime + (long)(sleepTime * Math.random());
+ sleepTime = sleepTime + (long) (sleepTime * Math.random());
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java b/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
index 4f5b112..518ab81 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
@@ -25,17 +25,17 @@ import org.junit.Assert;
import org.junit.Test;
/**
- *
+ *
*/
public class AgeOffStoreTest {
-
+
private static class TestTimeSource implements TimeSource {
long time = 0;
-
+
public long currentTimeMillis() {
return time;
}
-
+
}
@Test
@@ -44,14 +44,14 @@ public class AgeOffStoreTest {
TestTimeSource tts = new TestTimeSource();
SimpleStore<String> sstore = new SimpleStore<String>();
AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts);
-
+
aoStore.ageOff();
Long txid1 = aoStore.create();
aoStore.reserve(txid1);
aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
aoStore.unreserve(txid1, 0);
-
+
aoStore.ageOff();
Long txid2 = aoStore.create();
@@ -59,7 +59,7 @@ public class AgeOffStoreTest {
aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
aoStore.setStatus(txid2, TStatus.FAILED);
aoStore.unreserve(txid2, 0);
-
+
tts.time = 6;
Long txid3 = aoStore.create();
@@ -67,21 +67,21 @@ public class AgeOffStoreTest {
aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
aoStore.unreserve(txid3, 0);
-
+
Long txid4 = aoStore.create();
-
+
aoStore.ageOff();
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
-
+
tts.time = 15;
-
+
aoStore.ageOff();
-
+
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid3, txid4)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(3, new HashSet<Long>(aoStore.list()).size());
-
+
tts.time = 30;
aoStore.ageOff();
@@ -89,71 +89,71 @@ public class AgeOffStoreTest {
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
}
-
+
@Test
public void testNonEmpty() {
// test age off when source store starts off non empty
-
+
TestTimeSource tts = new TestTimeSource();
SimpleStore<String> sstore = new SimpleStore<String>();
Long txid1 = sstore.create();
sstore.reserve(txid1);
sstore.setStatus(txid1, TStatus.IN_PROGRESS);
sstore.unreserve(txid1, 0);
-
+
Long txid2 = sstore.create();
sstore.reserve(txid2);
sstore.setStatus(txid2, TStatus.IN_PROGRESS);
sstore.setStatus(txid2, TStatus.FAILED);
sstore.unreserve(txid2, 0);
-
+
Long txid3 = sstore.create();
sstore.reserve(txid3);
sstore.setStatus(txid3, TStatus.IN_PROGRESS);
sstore.setStatus(txid3, TStatus.SUCCESSFUL);
sstore.unreserve(txid3, 0);
-
+
Long txid4 = sstore.create();
-
+
AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts);
-
+
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
-
+
aoStore.ageOff();
-
+
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
-
+
tts.time = 15;
aoStore.ageOff();
-
+
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
-
+
aoStore.reserve(txid1);
aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
aoStore.unreserve(txid1, 0);
-
+
tts.time = 30;
-
+
aoStore.ageOff();
-
+
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
-
+
aoStore.reserve(txid1);
aoStore.setStatus(txid1, TStatus.FAILED);
aoStore.unreserve(txid1, 0);
-
+
aoStore.ageOff();
-
+
Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
-
+
tts.time = 42;
-
+
aoStore.ageOff();
Assert.assertEquals(0, new HashSet<Long>(aoStore.list()).size());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java b/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java
index c2d5f92..eea5f1b 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java
@@ -20,7 +20,6 @@ import java.util.Collections;
import java.util.EnumSet;
import org.apache.accumulo.fate.ReadOnlyTStore.TStatus;
-
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@@ -47,7 +46,7 @@ public class ReadOnlyStoreTest {
EasyMock.expect(mock.waitForStatusChange(0xdeadbeefl, EnumSet.allOf(TStatus.class))).andReturn(TStatus.UNKNOWN);
EasyMock.expect(mock.getProperty(0xdeadbeefl, "com.example.anyproperty")).andReturn("property");
- EasyMock.expect(mock.list()).andReturn(Collections.<Long>emptyList());
+ EasyMock.expect(mock.list()).andReturn(Collections.<Long> emptyList());
EasyMock.replay(repo);
EasyMock.replay(mock);
@@ -64,7 +63,7 @@ public class ReadOnlyStoreTest {
Assert.assertEquals(TStatus.UNKNOWN, store.waitForStatusChange(0xdeadbeefl, EnumSet.allOf(TStatus.class)));
Assert.assertEquals("property", store.getProperty(0xdeadbeefl, "com.example.anyproperty"));
- Assert.assertEquals(Collections.<Long>emptyList(), store.list());
+ Assert.assertEquals(Collections.<Long> emptyList(), store.list());
EasyMock.verify(repo);
EasyMock.verify(mock);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java b/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
index 60eabfb..f0bac88 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
@@ -31,62 +31,62 @@ import org.apache.commons.lang.NotImplementedException;
* Transient in memory store for transactions.
*/
public class SimpleStore<T> implements TStore<T> {
-
+
private long nextId = 1;
private Map<Long,TStatus> statuses = new HashMap<Long,TStore.TStatus>();
private Set<Long> reserved = new HashSet<Long>();
-
+
@Override
public long create() {
statuses.put(nextId, TStatus.NEW);
return nextId++;
}
-
+
@Override
public long reserve() {
throw new NotImplementedException();
}
-
+
@Override
public void reserve(long tid) {
if (reserved.contains(tid))
throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve twice... if test change, then change this
reserved.add(tid);
}
-
+
@Override
public void unreserve(long tid, long deferTime) {
if (!reserved.remove(tid)) {
throw new IllegalStateException();
}
}
-
+
@Override
public Repo<T> top(long tid) {
throw new NotImplementedException();
}
-
+
@Override
public void push(long tid, Repo<T> repo) throws StackOverflowException {
throw new NotImplementedException();
}
-
+
@Override
public void pop(long tid) {
throw new NotImplementedException();
}
-
+
@Override
public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) {
if (!reserved.contains(tid))
throw new IllegalStateException();
-
+
TStatus status = statuses.get(tid);
if (status == null)
return TStatus.UNKNOWN;
return status;
}
-
+
@Override
public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) {
if (!reserved.contains(tid))
@@ -95,32 +95,32 @@ public class SimpleStore<T> implements TStore<T> {
throw new IllegalStateException();
statuses.put(tid, status);
}
-
+
@Override
public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus> expected) {
throw new NotImplementedException();
}
-
+
@Override
public void setProperty(long tid, String prop, Serializable val) {
throw new NotImplementedException();
}
-
+
@Override
public Serializable getProperty(long tid, String prop) {
throw new NotImplementedException();
}
-
+
@Override
public void delete(long tid) {
if (!reserved.contains(tid))
throw new IllegalStateException();
statuses.remove(tid);
}
-
+
@Override
public List<Long> list() {
return new ArrayList<Long>(statuses.keySet());
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java b/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java
index 6e6a3c3..ab9032d 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
/**
* Test the AddressUtil class.
- *
+ *
*/
public class AddressUtilTest extends TestCase {
@@ -88,7 +88,7 @@ public class AddressUtilTest extends TestCase {
log.info("AddressUtil is (hopefully) going to spit out an error about DNS lookups. you can ignore it.");
AddressUtil.getAddressCacheNegativeTtl(null);
fail("The JVM Security settings cache DNS failures forever, this should cause an exception.");
- } catch(IllegalArgumentException exception) {
+ } catch (IllegalArgumentException exception) {
assertTrue(true);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
index a9a8e3c..9e540e4 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
@@ -16,31 +16,31 @@
*/
package org.apache.accumulo.fate.zookeeper;
+import static org.junit.Assert.assertEquals;
+
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
-import static org.junit.Assert.*;
-
import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock.QueueLock;
import org.junit.Test;
public class DistributedReadWriteLockTest {
-
+
// Non-zookeeper version of QueueLock
public static class MockQueueLock implements QueueLock {
-
+
long next = 0L;
final SortedMap<Long,byte[]> locks = new TreeMap<Long,byte[]>();
-
+
@Override
synchronized public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>();
result.putAll(locks.headMap(entry + 1));
return result;
}
-
+
@Override
synchronized public void removeEntry(long entry) {
synchronized (locks) {
@@ -48,7 +48,7 @@ public class DistributedReadWriteLockTest {
locks.notifyAll();
}
}
-
+
@Override
synchronized public long addEntry(byte[] data) {
long result;
@@ -59,31 +59,31 @@ public class DistributedReadWriteLockTest {
return result;
}
}
-
+
// some data that is probably not going to update atomically
static class SomeData {
volatile int[] data = new int[100];
volatile int counter;
-
+
void read() {
for (int i = 0; i < data.length; i++)
assertEquals(counter, data[i]);
}
-
+
void write() {
++counter;
for (int i = data.length - 1; i >= 0; i--)
data[i] = counter;
}
}
-
+
@Test
public void testLock() throws Exception {
final SomeData data = new SomeData();
data.write();
data.read();
QueueLock qlock = new MockQueueLock();
-
+
final ReadWriteLock locker = new DistributedReadWriteLock(qlock, "locker1".getBytes());
final Lock readLock = locker.readLock();
final Lock writeLock = locker.writeLock();
@@ -93,7 +93,7 @@ public class DistributedReadWriteLockTest {
writeLock.unlock();
readLock.lock();
readLock.unlock();
-
+
// do a bunch of reads/writes in separate threads, look for inconsistent updates
Thread[] threads = new Thread[2];
for (int i = 0; i < threads.length; i++) {
@@ -128,5 +128,5 @@ public class DistributedReadWriteLockTest {
t.join();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
index 840d33b..0e4e329 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
@@ -23,15 +23,14 @@ import java.util.Map;
import java.util.concurrent.Callable;
import org.junit.Assert;
-
import org.junit.Test;
public class TransactionWatcherTest {
-
+
static class SimpleArbitrator implements TransactionWatcher.Arbitrator {
Map<String,List<Long>> started = new HashMap<String,List<Long>>();
Map<String,List<Long>> cleanedUp = new HashMap<String,List<Long>>();
-
+
public synchronized void start(String txType, Long txid) throws Exception {
List<Long> txids = started.get(txType);
if (txids == null)
@@ -40,7 +39,7 @@ public class TransactionWatcherTest {
throw new Exception("transaction already started");
txids.add(txid);
started.put(txType, txids);
-
+
txids = cleanedUp.get(txType);
if (txids == null)
txids = new ArrayList<Long>();
@@ -49,7 +48,7 @@ public class TransactionWatcherTest {
txids.add(txid);
cleanedUp.put(txType, txids);
}
-
+
public synchronized void stop(String txType, Long txid) throws Exception {
List<Long> txids = started.get(txType);
if (txids != null && txids.contains(txid)) {
@@ -58,7 +57,7 @@ public class TransactionWatcherTest {
}
throw new Exception("transaction does not exist");
}
-
+
public synchronized void cleanup(String txType, Long txid) throws Exception {
List<Long> txids = cleanedUp.get(txType);
if (txids != null && txids.contains(txid)) {
@@ -67,7 +66,7 @@ public class TransactionWatcherTest {
}
throw new Exception("transaction does not exist");
}
-
+
@Override
synchronized public boolean transactionAlive(String txType, long tid) throws Exception {
List<Long> txids = started.get(txType);
@@ -83,9 +82,9 @@ public class TransactionWatcherTest {
return true;
return !txids.contains(tid);
}
-
+
}
-
+
@Test
public void testTransactionWatcher() throws Exception {
final String txType = "someName";
@@ -109,12 +108,12 @@ public class TransactionWatcherTest {
}
});
Assert.assertFalse(txw.isActive(txid));
- Assert.assertFalse(sa.transactionComplete(txType, txid));
+ Assert.assertFalse(sa.transactionComplete(txType, txid));
sa.stop(txType, txid);
Assert.assertFalse(sa.transactionAlive(txType, txid));
- Assert.assertFalse(sa.transactionComplete(txType, txid));
+ Assert.assertFalse(sa.transactionComplete(txType, txid));
sa.cleanup(txType, txid);
- Assert.assertTrue(sa.transactionComplete(txType, txid));
+ Assert.assertTrue(sa.transactionComplete(txType, txid));
try {
txw.run(txType, txid, new Callable<Object>() {
@Override
@@ -150,7 +149,7 @@ public class TransactionWatcherTest {
return null;
}
});
-
+
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java
index e7dffc1..19d8f67 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java
@@ -16,15 +16,16 @@
*/
package org.apache.accumulo.fate.zookeeper;
-import org.apache.zookeeper.Watcher;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
import static org.easymock.EasyMock.createMock;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
+import org.apache.zookeeper.Watcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
public class ZooCacheFactoryTest {
private ZooCacheFactory zcf;
@@ -65,6 +66,7 @@ public class ZooCacheFactoryTest {
ZooCache zc1 = zcf.getZooCache(zks1, timeout1, watcher);
assertNotNull(zc1);
}
+
@Test
public void testGetZooCacheWatcher_Null() {
String zks1 = "zk1";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
index e3db785..5dd6f61 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
@@ -16,22 +16,6 @@
*/
package org.apache.accumulo.fate.zookeeper;
-import java.util.List;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import org.easymock.Capture;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
@@ -41,6 +25,23 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
public class ZooCacheTest {
private static final String ZPATH = "/some/path/in/zk";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java
index 59fb498..9203b39 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java
@@ -96,7 +96,7 @@ public class ZooReaderWriterTest {
@Test(expected = SessionExpiredException.class)
public void testMutateNodeCreationFails() throws Exception {
final String path = "/foo";
- final byte[] value = new byte[]{0};
+ final byte[] value = new byte[] {0};
final List<ACL> acls = Collections.<ACL> emptyList();
Mutator mutator = new Mutator() {
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java
index fae7e6e..82d3d1d 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java
@@ -21,10 +21,10 @@ import org.junit.Test;
public class ZooSessionTest {
- private static final int MINIMUM_TIMEOUT=10000;
+ private static final int MINIMUM_TIMEOUT = 10000;
private static final String UNKNOWN_HOST = "hostname.that.should.not.exist.example.com:2181";
- @Test(expected=RuntimeException.class, timeout=MINIMUM_TIMEOUT*4)
+ @Test(expected = RuntimeException.class, timeout = MINIMUM_TIMEOUT * 4)
public void testUnknownHost() throws Exception {
ZooKeeper session = ZooSession.connect(UNKNOWN_HOST, MINIMUM_TIMEOUT, null, null, null);
session.close();