You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/04/10 19:45:58 UTC

svn commit: r1466589 - in /accumulo/trunk: ./ assemble/ core/ examples/ fate/src/main/java/org/apache/accumulo/fate/ fate/src/main/java/org/apache/accumulo/fate/zookeeper/ server/ server/src/main/java/org/apache/accumulo/server/constraints/ server/src/...

Author: kturner
Date: Wed Apr 10 17:45:58 2013
New Revision: 1466589

URL: http://svn.apache.org/r1466589
Log:
ACCUMULO-1044 fixed some issues w/ metadata constraint bulk flag check, made the check more strict, and added a lot test for it

Modified:
    accumulo/trunk/   (props changed)
    accumulo/trunk/assemble/   (props changed)
    accumulo/trunk/core/   (props changed)
    accumulo/trunk/examples/   (props changed)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java   (props changed)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java   (props changed)
    accumulo/trunk/server/   (props changed)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
    accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
    accumulo/trunk/src/   (props changed)

Propchange: accumulo/trunk/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5:r1466582

Propchange: accumulo/trunk/assemble/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/assemble:r1466582

Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/core:r1466582

Propchange: accumulo/trunk/examples/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/examples:r1466582

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1466582

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1466582

Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/server:r1466582

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java?rev=1466589&r1=1466588&r2=1466589&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java Wed Apr 10 17:45:58 2013
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -139,6 +140,8 @@ public class MetadataConstraints impleme
       violations = addViolation(violations, 5);
     }
     
+    boolean checkedBulk = false;
+
     for (ColumnUpdate columnUpdate : colUpdates) {
       Text columnFamily = new Text(columnUpdate.getColumnFamily());
       
@@ -168,7 +171,7 @@ public class MetadataConstraints impleme
       } else if (columnFamily.equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) {
         
       } else if (columnFamily.equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY)) {
-        if (!columnUpdate.isDeleted()) {
+        if (!columnUpdate.isDeleted() && !checkedBulk) {
           // splits, which also write the time reference, are allowed to write this reference even when
           // the transaction is not running because the other half of the tablet is holding a reference
           // to the file.
@@ -177,26 +180,42 @@ public class MetadataConstraints impleme
           // but it writes everything.  We allow it to re-write the bulk information if it is setting the location. 
           // See ACCUMULO-1230. 
           boolean isLocationMutation = false;
+          
+          HashSet<Text> dataFiles = new HashSet<Text>();
+          HashSet<Text> loadedFiles = new HashSet<Text>();
+
+          String tidString = new String(columnUpdate.getValue());
+          int otherTidCount = 0;
+
           for (ColumnUpdate update : mutation.getUpdates()) {
             if (new ColumnFQ(update).equals(Constants.METADATA_DIRECTORY_COLUMN)) {
               isSplitMutation = true;
-            }
-            if (update.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
+            } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
               isLocationMutation = true;
+            } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
+              dataFiles.add(new Text(update.getColumnQualifier()));
+            } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY)) {
+              loadedFiles.add(new Text(update.getColumnQualifier()));
+              
+              if (!new String(update.getValue()).equals(tidString)) {
+                otherTidCount++;
+              }
             }
           }
           
           if (!isSplitMutation && !isLocationMutation) {
-            String tidString = new String(columnUpdate.getValue());
             long tid = Long.parseLong(tidString);
+            
             try {
-              if (!new ZooArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
+              if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
                 violations = addViolation(violations, 8);
               }
             } catch (Exception ex) {
               violations = addViolation(violations, 8);
             }
           }
+          
+          checkedBulk = true;
         }
       } else {
         if (!isValidColumn(columnUpdate)) {
@@ -248,6 +267,10 @@ public class MetadataConstraints impleme
     return violations;
   }
   
+  protected Arbitrator getArbitrator() {
+    return new ZooArbitrator();
+  }
+
   public String getViolationDescription(short violationCode) {
     switch (violationCode) {
       case 1:

Modified: accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java?rev=1466589&r1=1466588&r2=1466589&view=diff
==============================================================================
--- accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java (original)
+++ accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java Wed Apr 10 17:45:58 2013
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.const
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import java.util.List;
 
@@ -25,6 +26,7 @@ import org.apache.accumulo.core.Constant
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -32,6 +34,26 @@ import org.junit.Test;
 
 public class MetadataConstraintsTest {
   
+  static class TestMetadataConstraints extends MetadataConstraints {
+    @Override
+    protected Arbitrator getArbitrator() {
+      return new Arbitrator() {
+        
+        @Override
+        public boolean transactionAlive(String type, long tid) throws Exception {
+          if (tid == 9)
+            throw new RuntimeException("txid 9 reserved for future use");
+          return tid == 5 || tid == 7;
+        }
+        
+        @Override
+        public boolean transactionComplete(String type, long tid) throws Exception {
+          return tid != 5 && tid != 7;
+        }
+      };
+    }
+  }
+
   @Test
   public void testCheck() {
     Logger.getLogger(AccumuloConfiguration.class).setLevel(Level.ERROR);
@@ -106,13 +128,112 @@ public class MetadataConstraintsTest {
     assertEquals(1, violations.size());
     assertEquals(Short.valueOf((short) 4), violations.get(0));
 
+  }
+  
+  @Test
+  public void testBulkFileCheck() {
+    MetadataConstraints mc = new TestMetadataConstraints();
+    Mutation m;
+    List<Short> violations;
+
+    // inactive txid
     m = new Mutation(new Text("0;foo"));
     m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
     violations = mc.check(null, m);
-    
     assertNotNull(violations);
     assertEquals(1, violations.size());
     assertEquals(Short.valueOf((short)8), violations.get(0));
+    
+    // txid that throws exception
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("9".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // active txid w/ file
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid w/o file
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // two active txids w/ files
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("7".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+    // two files w/ one active txid
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+
+    // two loaded w/ one active txid and one file
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+    // active txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+    Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes()));
+    Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes()));
+    m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // deleting a load flag
+    m = new Mutation(new Text("0;foo"));
+    m.putDelete(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"));
+    violations = mc.check(null, m);
+    assertNull(violations);
+
+
   }
   
 }

Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/src:r1466582