You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/04/09 22:13:28 UTC

svn commit: r1466203 - in /accumulo/branches/1.5: ./ assemble/ core/ fate/src/main/java/org/apache/accumulo/fate/ fate/src/main/java/org/apache/accumulo/fate/zookeeper/ fate/src/test/java/org/apache/accumulo/fate/zookeeper/ server/ server/src/main/java...

Author: ecn
Date: Tue Apr  9 20:13:27 2013
New Revision: 1466203

URL: http://svn.apache.org/r1466203
Log:
ACCUMULO-1044 clearly define "transaction should stop" and "transaction is complete"

Modified:
    accumulo/branches/1.5/   (props changed)
    accumulo/branches/1.5/assemble/   (props changed)
    accumulo/branches/1.5/core/   (props changed)
    accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java   (props changed)
    accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
    accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java   (props changed)
    accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
    accumulo/branches/1.5/server/   (props changed)
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
    accumulo/branches/1.5/src/   (props changed)

Propchange: accumulo/branches/1.5/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4:r1466199

Propchange: accumulo/branches/1.5/assemble/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/assemble:r1466199

Propchange: accumulo/branches/1.5/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/core:r1466199

Propchange: accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1466199

Modified: accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java (original)
+++ accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java Tue Apr  9 20:13:27 2013
@@ -31,6 +31,7 @@ public class TransactionWatcher {
   
   public interface Arbitrator {
     boolean transactionAlive(String type, long tid) throws Exception;
+    boolean transactionComplete(String type, long tid) throws Exception;
   }
   
   public TransactionWatcher(Arbitrator arbitrator) {

Propchange: accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1466199

Modified: accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java (original)
+++ accumulo/branches/1.5/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java Tue Apr  9 20:13:27 2013
@@ -29,20 +29,38 @@ import org.junit.Test;
 public class TransactionWatcherTest {
   
   static class SimpleArbitrator implements TransactionWatcher.Arbitrator {
-    Map<String,List<Long>> map = new HashMap<String,List<Long>>();
+    Map<String,List<Long>> started = new HashMap<String,List<Long>>();
+    Map<String,List<Long>> cleanedUp = new HashMap<String,List<Long>>();
     
     public synchronized void start(String txType, Long txid) throws Exception {
-      List<Long> txids = map.get(txType);
+      List<Long> txids = started.get(txType);
       if (txids == null)
         txids = new ArrayList<Long>();
       if (txids.contains(txid))
         throw new Exception("transaction already started");
       txids.add(txid);
-      map.put(txType, txids);
+      started.put(txType, txids);
+      
+      txids = cleanedUp.get(txType);
+      if (txids == null)
+        txids = new ArrayList<Long>();
+      if (txids.contains(txid))
+        throw new IllegalStateException("transaction was started but not cleaned up");
+      txids.add(txid);
+      cleanedUp.put(txType, txids);
     }
     
     public synchronized void stop(String txType, Long txid) throws Exception {
-      List<Long> txids = map.get(txType);
+      List<Long> txids = started.get(txType);
+      if (txids != null && txids.contains(txid)) {
+        txids.remove(txids.indexOf(txid));
+        return;
+      }
+      throw new Exception("transaction does not exist");
+    }
+    
+    public synchronized void cleanup(String txType, Long txid) throws Exception {
+      List<Long> txids = cleanedUp.get(txType);
       if (txids != null && txids.contains(txid)) {
         txids.remove(txids.indexOf(txid));
         return;
@@ -52,11 +70,19 @@ public class TransactionWatcherTest {
     
     @Override
     synchronized public boolean transactionAlive(String txType, long tid) throws Exception {
-      List<Long> txids = map.get(txType);
+      List<Long> txids = started.get(txType);
       if (txids == null)
         return false;
       return txids.contains(tid);
     }
+
+    @Override
+    public boolean transactionComplete(String txType, long tid) throws Exception {
+      List<Long> txids = cleanedUp.get(txType);
+      if (txids == null)
+        return true;
+      return !txids.contains(tid);
+    }
     
   }
   
@@ -83,8 +109,12 @@ public class TransactionWatcherTest {
       }
     });
     Assert.assertFalse(txw.isActive(txid));
+    Assert.assertFalse(sa.transactionComplete(txType,  txid));
     sa.stop(txType, txid);
     Assert.assertFalse(sa.transactionAlive(txType, txid));
+    Assert.assertFalse(sa.transactionComplete(txType,  txid));
+    sa.cleanup(txType, txid);
+    Assert.assertTrue(sa.transactionComplete(txType,  txid));
     try {
       txw.run(txType, txid, new Callable<Object>() {
         @Override

Propchange: accumulo/branches/1.5/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/server:r1466199

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java Tue Apr  9 20:13:27 2013
@@ -178,8 +178,8 @@ public class MetadataConstraints impleme
           // See ACCUMULO-1230. 
           boolean isLocationMutation = false;
           for (ColumnUpdate update : mutation.getUpdates()) {
-            if (new ColumnFQ(update).equals(Constants.METADATA_TIME_COLUMN)) {
-              isSplitMutation = true;
+            if (new ColumnFQ(update).equals(Constants.METADATA_SPLIT_RATIO_COLUMN)) {
+              isSplitMutation = update.isDeleted();
             }
             if (update.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
               isLocationMutation = true;

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java Tue Apr  9 20:13:27 2013
@@ -50,7 +50,7 @@ public class MetadataBulkLoadFilter exte
       Status status = bulkTxStatusCache.get(txid);
       if (status == null) {
         try {
-          if (arbitrator.transactionAlive(Constants.BULK_ARBITRATOR_TYPE, txid)) {
+          if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
             status = Status.ACTIVE;
           } else {
             status = Status.INACTIVE;

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Tue Apr  9 20:13:27 2013
@@ -302,6 +302,8 @@ class CleanUpBulkImport extends MasterRe
     Utils.unreserveHdfsDirectory(source, tid);
     Utils.unreserveHdfsDirectory(error, tid);
     Utils.getReadLock(tableId, tid).unlock();
+    log.debug("completing bulk import transaction " + tid);
+    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
     return null;
   }
 }

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java?rev=1466203&r1=1466202&r2=1466203&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java Tue Apr  9 20:13:27 2013
@@ -33,7 +33,7 @@ public class TransactionWatcher extends 
     
     @Override
     public boolean transactionAlive(String type, long tid) throws Exception {
-      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + Long.toString(tid);
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid;
       rdr.sync(path);
       return rdr.exists(path);
     }
@@ -43,6 +43,7 @@ public class TransactionWatcher extends 
       IZooReaderWriter writer = ZooReaderWriter.getInstance();
       writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type, new byte[] {}, NodeExistsPolicy.OVERWRITE);
       writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, new byte[] {}, NodeExistsPolicy.OVERWRITE);
+      writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", new byte[] {}, NodeExistsPolicy.OVERWRITE);
     }
     
     public static void stop(String type, long tid) throws KeeperException, InterruptedException {
@@ -50,6 +51,20 @@ public class TransactionWatcher extends 
       IZooReaderWriter writer = ZooReaderWriter.getInstance();
       writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
     }
+    
+    public static void cleanup(String type, long tid) throws KeeperException, InterruptedException {
+      Instance instance = HdfsZooInstance.getInstance();
+      IZooReaderWriter writer = ZooReaderWriter.getInstance();
+      writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
+      writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", NodeMissingPolicy.SKIP);
+    }
+
+    @Override
+    public boolean transactionComplete(String type, long tid) throws Exception {
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running";
+      rdr.sync(path);
+      return !rdr.exists(path);
+    }
   }
   
   public TransactionWatcher() {

Propchange: accumulo/branches/1.5/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1466199