You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/04/09 22:13:28 UTC
svn commit: r1466203 - in /accumulo/branches/1.5: ./ assemble/ core/
fate/src/main/java/org/apache/accumulo/fate/
fate/src/main/java/org/apache/accumulo/fate/zookeeper/
fate/src/test/java/org/apache/accumulo/fate/zookeeper/ server/
server/src/main/java...
Author: ecn
Date: Tue Apr 9 20:13:27 2013
New Revision: 1466203
URL: http://svn.apache.org/r1466203
Log:
ACCUMULO-1044 clearly define "transaction should stop" and "transaction is complete"
Modified:
accumulo/branches/1.5/ (props changed)
accumulo/branches/1.5/assemble/ (props changed)
accumulo/branches/1.5/core/ (props changed)
accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (props changed)
accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (props changed)
accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
accumulo/branches/1.5/server/ (props changed)
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
accumulo/branches/1.5/src/ (props changed)
Propchange: accumulo/branches/1.5/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4:r1466199
Propchange: accumulo/branches/1.5/assemble/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4/assemble:r1466199
Propchange: accumulo/branches/1.5/core/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4/core:r1466199
Propchange: accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1466199
Modified: accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java (original)
+++ accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java Tue Apr 9 20:13:27 2013
@@ -31,6 +31,7 @@ public class TransactionWatcher {
public interface Arbitrator {
boolean transactionAlive(String type, long tid) throws Exception;
+ boolean transactionComplete(String type, long tid) throws Exception;
}
public TransactionWatcher(Arbitrator arbitrator) {
Propchange: accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1466199
Modified: accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java (original)
+++ accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java Tue Apr 9 20:13:27 2013
@@ -29,20 +29,38 @@ import org.junit.Test;
public class TransactionWatcherTest {
static class SimpleArbitrator implements TransactionWatcher.Arbitrator {
- Map<String,List<Long>> map = new HashMap<String,List<Long>>();
+ Map<String,List<Long>> started = new HashMap<String,List<Long>>();
+ Map<String,List<Long>> cleanedUp = new HashMap<String,List<Long>>();
public synchronized void start(String txType, Long txid) throws Exception {
- List<Long> txids = map.get(txType);
+ List<Long> txids = started.get(txType);
if (txids == null)
txids = new ArrayList<Long>();
if (txids.contains(txid))
throw new Exception("transaction already started");
txids.add(txid);
- map.put(txType, txids);
+ started.put(txType, txids);
+
+ txids = cleanedUp.get(txType);
+ if (txids == null)
+ txids = new ArrayList<Long>();
+ if (txids.contains(txid))
+ throw new IllegalStateException("transaction was started but not cleaned up");
+ txids.add(txid);
+ cleanedUp.put(txType, txids);
}
public synchronized void stop(String txType, Long txid) throws Exception {
- List<Long> txids = map.get(txType);
+ List<Long> txids = started.get(txType);
+ if (txids != null && txids.contains(txid)) {
+ txids.remove(txids.indexOf(txid));
+ return;
+ }
+ throw new Exception("transaction does not exist");
+ }
+
+ public synchronized void cleanup(String txType, Long txid) throws Exception {
+ List<Long> txids = cleanedUp.get(txType);
if (txids != null && txids.contains(txid)) {
txids.remove(txids.indexOf(txid));
return;
@@ -52,11 +70,19 @@ public class TransactionWatcherTest {
@Override
synchronized public boolean transactionAlive(String txType, long tid) throws Exception {
- List<Long> txids = map.get(txType);
+ List<Long> txids = started.get(txType);
if (txids == null)
return false;
return txids.contains(tid);
}
+
+ @Override
+ public boolean transactionComplete(String txType, long tid) throws Exception {
+ List<Long> txids = cleanedUp.get(txType);
+ if (txids == null)
+ return true;
+ return !txids.contains(tid);
+ }
}
@@ -83,8 +109,12 @@ public class TransactionWatcherTest {
}
});
Assert.assertFalse(txw.isActive(txid));
+ Assert.assertFalse(sa.transactionComplete(txType, txid));
sa.stop(txType, txid);
Assert.assertFalse(sa.transactionAlive(txType, txid));
+ Assert.assertFalse(sa.transactionComplete(txType, txid));
+ sa.cleanup(txType, txid);
+ Assert.assertTrue(sa.transactionComplete(txType, txid));
try {
txw.run(txType, txid, new Callable<Object>() {
@Override
Propchange: accumulo/branches/1.5/server/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4/server:r1466199
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java Tue Apr 9 20:13:27 2013
@@ -178,8 +178,8 @@ public class MetadataConstraints impleme
// See ACCUMULO-1230.
boolean isLocationMutation = false;
for (ColumnUpdate update : mutation.getUpdates()) {
- if (new ColumnFQ(update).equals(Constants.METADATA_TIME_COLUMN)) {
- isSplitMutation = true;
+ if (new ColumnFQ(update).equals(Constants.METADATA_SPLIT_RATIO_COLUMN)) {
+ isSplitMutation = update.isDeleted();
}
if (update.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
isLocationMutation = true;
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java Tue Apr 9 20:13:27 2013
@@ -50,7 +50,7 @@ public class MetadataBulkLoadFilter exte
Status status = bulkTxStatusCache.get(txid);
if (status == null) {
try {
- if (arbitrator.transactionAlive(Constants.BULK_ARBITRATOR_TYPE, txid)) {
+ if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
status = Status.ACTIVE;
} else {
status = Status.INACTIVE;
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Tue Apr 9 20:13:27 2013
@@ -302,6 +302,8 @@ class CleanUpBulkImport extends MasterRe
Utils.unreserveHdfsDirectory(source, tid);
Utils.unreserveHdfsDirectory(error, tid);
Utils.getReadLock(tableId, tid).unlock();
+ log.debug("completing bulk import transaction " + tid);
+ ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
return null;
}
}
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java Tue Apr 9 20:13:27 2013
@@ -33,7 +33,7 @@ public class TransactionWatcher extends
@Override
public boolean transactionAlive(String type, long tid) throws Exception {
- String path = ZooUtil.getRoot(instance) + "/" + type + "/" + Long.toString(tid);
+ String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid;
rdr.sync(path);
return rdr.exists(path);
}
@@ -43,6 +43,7 @@ public class TransactionWatcher extends
IZooReaderWriter writer = ZooReaderWriter.getInstance();
writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type, new byte[] {}, NodeExistsPolicy.OVERWRITE);
writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, new byte[] {}, NodeExistsPolicy.OVERWRITE);
+ writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", new byte[] {}, NodeExistsPolicy.OVERWRITE);
}
public static void stop(String type, long tid) throws KeeperException, InterruptedException {
@@ -50,6 +51,20 @@ public class TransactionWatcher extends
IZooReaderWriter writer = ZooReaderWriter.getInstance();
writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
}
+
+ public static void cleanup(String type, long tid) throws KeeperException, InterruptedException {
+ Instance instance = HdfsZooInstance.getInstance();
+ IZooReaderWriter writer = ZooReaderWriter.getInstance();
+ writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
+ writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", NodeMissingPolicy.SKIP);
+ }
+
+ @Override
+ public boolean transactionComplete(String type, long tid) throws Exception {
+ String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running";
+ rdr.sync(path);
+ return !rdr.exists(path);
+ }
}
public TransactionWatcher() {
Propchange: accumulo/branches/1.5/src/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4/src:r1466199