You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/04/10 19:45:58 UTC
svn commit: r1466589 - in /accumulo/trunk: ./ assemble/ core/ examples/
fate/src/main/java/org/apache/accumulo/fate/
fate/src/main/java/org/apache/accumulo/fate/zookeeper/ server/
server/src/main/java/org/apache/accumulo/server/constraints/ server/src/...
Author: kturner
Date: Wed Apr 10 17:45:58 2013
New Revision: 1466589
URL: http://svn.apache.org/r1466589
Log:
ACCUMULO-1044 fixed some issues w/ metadata constraint bulk flag check, made the check more strict, and added a lot test for it
Modified:
accumulo/trunk/ (props changed)
accumulo/trunk/assemble/ (props changed)
accumulo/trunk/core/ (props changed)
accumulo/trunk/examples/ (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (props changed)
accumulo/trunk/server/ (props changed)
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
accumulo/trunk/src/ (props changed)
Propchange: accumulo/trunk/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5:r1466582
Propchange: accumulo/trunk/assemble/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/assemble:r1466582
Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/core:r1466582
Propchange: accumulo/trunk/examples/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/examples:r1466582
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1466582
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1466582
Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/server:r1466582
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java?rev=1466589&r1=1466588&r2=1466589&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java Wed Apr 10 17:45:58 2013
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -139,6 +140,8 @@ public class MetadataConstraints impleme
violations = addViolation(violations, 5);
}
+ boolean checkedBulk = false;
+
for (ColumnUpdate columnUpdate : colUpdates) {
Text columnFamily = new Text(columnUpdate.getColumnFamily());
@@ -168,7 +171,7 @@ public class MetadataConstraints impleme
} else if (columnFamily.equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) {
} else if (columnFamily.equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY)) {
- if (!columnUpdate.isDeleted()) {
+ if (!columnUpdate.isDeleted() && !checkedBulk) {
// splits, which also write the time reference, are allowed to write this reference even when
// the transaction is not running because the other half of the tablet is holding a reference
// to the file.
@@ -177,26 +180,42 @@ public class MetadataConstraints impleme
// but it writes everything. We allow it to re-write the bulk information if it is setting the location.
// See ACCUMULO-1230.
boolean isLocationMutation = false;
+
+ HashSet<Text> dataFiles = new HashSet<Text>();
+ HashSet<Text> loadedFiles = new HashSet<Text>();
+
+ String tidString = new String(columnUpdate.getValue());
+ int otherTidCount = 0;
+
for (ColumnUpdate update : mutation.getUpdates()) {
if (new ColumnFQ(update).equals(Constants.METADATA_DIRECTORY_COLUMN)) {
isSplitMutation = true;
- }
- if (update.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
+ } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
isLocationMutation = true;
+ } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
+ dataFiles.add(new Text(update.getColumnQualifier()));
+ } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY)) {
+ loadedFiles.add(new Text(update.getColumnQualifier()));
+
+ if (!new String(update.getValue()).equals(tidString)) {
+ otherTidCount++;
+ }
}
}
if (!isSplitMutation && !isLocationMutation) {
- String tidString = new String(columnUpdate.getValue());
long tid = Long.parseLong(tidString);
+
try {
- if (!new ZooArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
+ if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
violations = addViolation(violations, 8);
}
} catch (Exception ex) {
violations = addViolation(violations, 8);
}
}
+
+ checkedBulk = true;
}
} else {
if (!isValidColumn(columnUpdate)) {
@@ -248,6 +267,10 @@ public class MetadataConstraints impleme
return violations;
}
+ protected Arbitrator getArbitrator() {
+ return new ZooArbitrator();
+ }
+
public String getViolationDescription(short violationCode) {
switch (violationCode) {
case 1:
Modified: accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java?rev=1466589&r1=1466588&r2=1466589&view=diff
==============================================================================
--- accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java (original)
+++ accumulo/trunk/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java Wed Apr 10 17:45:58 2013
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.const
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import java.util.List;
@@ -25,6 +26,7 @@ import org.apache.accumulo.core.Constant
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -32,6 +34,26 @@ import org.junit.Test;
public class MetadataConstraintsTest {
+ static class TestMetadataConstraints extends MetadataConstraints {
+ @Override
+ protected Arbitrator getArbitrator() {
+ return new Arbitrator() {
+
+ @Override
+ public boolean transactionAlive(String type, long tid) throws Exception {
+ if (tid == 9)
+ throw new RuntimeException("txid 9 reserved for future use");
+ return tid == 5 || tid == 7;
+ }
+
+ @Override
+ public boolean transactionComplete(String type, long tid) throws Exception {
+ return tid != 5 && tid != 7;
+ }
+ };
+ }
+ }
+
@Test
public void testCheck() {
Logger.getLogger(AccumuloConfiguration.class).setLevel(Level.ERROR);
@@ -106,13 +128,112 @@ public class MetadataConstraintsTest {
assertEquals(1, violations.size());
assertEquals(Short.valueOf((short) 4), violations.get(0));
+ }
+
+ @Test
+ public void testBulkFileCheck() {
+ MetadataConstraints mc = new TestMetadataConstraints();
+ Mutation m;
+ List<Short> violations;
+
+ // inactive txid
m = new Mutation(new Text("0;foo"));
m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
violations = mc.check(null, m);
-
assertNotNull(violations);
assertEquals(1, violations.size());
assertEquals(Short.valueOf((short)8), violations.get(0));
+
+ // txid that throws exception
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("9".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+ violations = mc.check(null, m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
+ assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+ // active txid w/ file
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+ violations = mc.check(null, m);
+ assertNull(violations);
+
+ // active txid w/o file
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+ violations = mc.check(null, m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
+ assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+ // two active txids w/ files
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("7".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("1,1".getBytes()));
+ violations = mc.check(null, m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
+ assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+ // two files w/ one active txid
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("5".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("1,1".getBytes()));
+ violations = mc.check(null, m);
+ assertNull(violations);
+
+ // two loaded w/ one active txid and one file
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes()));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("5".getBytes()));
+ violations = mc.check(null, m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
+ assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+ // active txid, mutation that looks like split
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+ Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+ violations = mc.check(null, m);
+ assertNull(violations);
+
+ // inactive txid, mutation that looks like split
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes()));
+ Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+ violations = mc.check(null, m);
+ assertNull(violations);
+
+ // active txid, mutation that looks like a load
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes()));
+ m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+ violations = mc.check(null, m);
+ assertNull(violations);
+
+ // inactive txid, mutation that looks like a load
+ m = new Mutation(new Text("0;foo"));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes()));
+ m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+ violations = mc.check(null, m);
+ assertNull(violations);
+
+ // deleting a load flag
+ m = new Mutation(new Text("0;foo"));
+ m.putDelete(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"));
+ violations = mc.check(null, m);
+ assertNull(violations);
+
+
}
}
Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/src:r1466582