You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/06/02 16:27:52 UTC
[1/2] accumulo git commit: ACCUMULO-4318 Made writers and scanners
auto closeable
Repository: accumulo
Updated Branches:
refs/heads/1.8 61a7de4a1 -> e67317cb2
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
index 4018d78..a3eed9f 100644
--- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -141,77 +141,78 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(tableName);
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
-
- // mutation conditional on column tx:seq not existing
- ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
- cm0.put("name", "last", "doe");
- cm0.put("name", "first", "john");
- cm0.put("tx", "seq", "1");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
- Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
-
- // mutation conditional on column tx:seq being 1
- ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
- cm1.put("name", "last", "Doe");
- cm1.put("tx", "seq", "2");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
-
- // test condition where value differs
- ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
- cm2.put("name", "last", "DOE");
- cm2.put("tx", "seq", "2");
- Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
-
- // test condition where column does not exists
- ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("txtypo", "seq").setValue("1"));
- cm3.put("name", "last", "deo");
- cm3.put("tx", "seq", "2");
- Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
-
- // test two conditions, where one should fail
- ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("doe"));
- cm4.put("name", "last", "deo");
- cm4.put("tx", "seq", "3");
- Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
-
- // test two conditions, where one should fail
- ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"), new Condition("name", "last").setValue("Doe"));
- cm5.put("name", "last", "deo");
- cm5.put("tx", "seq", "3");
- Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
-
- // ensure rejected mutations did not write
- Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
- scanner.fetchColumn(new Text("name"), new Text("last"));
- scanner.setRange(new Range("99006"));
- Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("Doe", entry.getValue().toString());
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
+
+ // mutation conditional on column tx:seq not existing
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
+ cm0.put("name", "last", "doe");
+ cm0.put("name", "first", "john");
+ cm0.put("tx", "seq", "1");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+ Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+
+ // mutation conditional on column tx:seq being 1
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
+ cm1.put("name", "last", "Doe");
+ cm1.put("tx", "seq", "2");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+
+ // test condition where value differs
+ ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
+ cm2.put("name", "last", "DOE");
+ cm2.put("tx", "seq", "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
+
+ // test condition where column does not exists
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("txtypo", "seq").setValue("1"));
+ cm3.put("name", "last", "deo");
+ cm3.put("tx", "seq", "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
+
+ // test two conditions, where one should fail
+ ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("doe"));
+ cm4.put("name", "last", "deo");
+ cm4.put("tx", "seq", "3");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
+
+ // test two conditions, where one should fail
+ ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"), new Condition("name", "last").setValue("Doe"));
+ cm5.put("name", "last", "deo");
+ cm5.put("tx", "seq", "3");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
+
+ // ensure rejected mutations did not write
+ Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ scanner.fetchColumn(new Text("name"), new Text("last"));
+ scanner.setRange(new Range("99006"));
+ Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("Doe", entry.getValue().toString());
- // test w/ two conditions that are met
- ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("Doe"));
- cm6.put("name", "last", "DOE");
- cm6.put("tx", "seq", "3");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
+ // test w/ two conditions that are met
+ ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("Doe"));
+ cm6.put("name", "last", "DOE");
+ cm6.put("tx", "seq", "3");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("DOE", entry.getValue().toString());
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("DOE", entry.getValue().toString());
- // test a conditional mutation that deletes
- ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("3"));
- cm7.putDelete("name", "last");
- cm7.putDelete("name", "first");
- cm7.putDelete("tx", "seq");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
+ // test a conditional mutation that deletes
+ ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("3"));
+ cm7.putDelete("name", "last");
+ cm7.putDelete("name", "first");
+ cm7.putDelete("tx", "seq");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
- Assert.assertFalse("Did not expect to find any results", scanner.iterator().hasNext());
+ Assert.assertFalse("Did not expect to find any results", scanner.iterator().hasNext());
- // add the row back
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
- Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+ // add the row back
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+ Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("doe", entry.getValue().toString());
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("doe", entry.getValue().toString());
+ }
}
@Test
@@ -242,74 +243,74 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(tableName);
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(auths));
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(auths))) {
- ColumnVisibility cva = new ColumnVisibility("A");
- ColumnVisibility cvb = new ColumnVisibility("B");
+ ColumnVisibility cva = new ColumnVisibility("A");
+ ColumnVisibility cvb = new ColumnVisibility("B");
- ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva));
- cm0.put("name", "last", cva, "doe");
- cm0.put("name", "first", cva, "john");
- cm0.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva));
+ cm0.put("name", "last", cva, "doe");
+ cm0.put("name", "first", cva, "john");
+ cm0.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
- Scanner scanner = conn.createScanner(tableName, auths);
- scanner.setRange(new Range("99006"));
- // TODO verify all columns
- scanner.fetchColumn(new Text("tx"), new Text("seq"));
- Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("1", entry.getValue().toString());
- long ts = entry.getKey().getTimestamp();
-
- // test wrong colf
- ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("txA", "seq").setVisibility(cva).setValue("1"));
- cm1.put("name", "last", cva, "Doe");
- cm1.put("name", "first", cva, "John");
- cm1.put("tx", "seq", cva, "2");
- Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
-
- // test wrong colq
- ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seqA").setVisibility(cva).setValue("1"));
- cm2.put("name", "last", cva, "Doe");
- cm2.put("name", "first", cva, "John");
- cm2.put("tx", "seq", cva, "2");
- Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
-
- // test wrong colv
- ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
- cm3.put("name", "last", cva, "Doe");
- cm3.put("name", "first", cva, "John");
- cm3.put("tx", "seq", cva, "2");
- Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
-
- // test wrong timestamp
- ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1"));
- cm4.put("name", "last", cva, "Doe");
- cm4.put("name", "first", cva, "John");
- cm4.put("tx", "seq", cva, "2");
- Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
-
- // test wrong timestamp
- ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1"));
- cm5.put("name", "last", cva, "Doe");
- cm5.put("name", "first", cva, "John");
- cm5.put("tx", "seq", cva, "2");
- Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
-
- // ensure no updates were made
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("1", entry.getValue().toString());
-
- // set all columns correctly
- ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1"));
- cm6.put("name", "last", cva, "Doe");
- cm6.put("name", "first", cva, "John");
- cm6.put("tx", "seq", cva, "2");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
-
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("2", entry.getValue().toString());
+ Scanner scanner = conn.createScanner(tableName, auths);
+ scanner.setRange(new Range("99006"));
+ // TODO verify all columns
+ scanner.fetchColumn(new Text("tx"), new Text("seq"));
+ Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("1", entry.getValue().toString());
+ long ts = entry.getKey().getTimestamp();
+
+ // test wrong colf
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("txA", "seq").setVisibility(cva).setValue("1"));
+ cm1.put("name", "last", cva, "Doe");
+ cm1.put("name", "first", cva, "John");
+ cm1.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
+
+ // test wrong colq
+ ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seqA").setVisibility(cva).setValue("1"));
+ cm2.put("name", "last", cva, "Doe");
+ cm2.put("name", "first", cva, "John");
+ cm2.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
+
+ // test wrong colv
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
+ cm3.put("name", "last", cva, "Doe");
+ cm3.put("name", "first", cva, "John");
+ cm3.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
+
+ // test wrong timestamp
+ ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1"));
+ cm4.put("name", "last", cva, "Doe");
+ cm4.put("name", "first", cva, "John");
+ cm4.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
+
+ // test wrong timestamp
+ ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1"));
+ cm5.put("name", "last", cva, "Doe");
+ cm5.put("name", "first", cva, "John");
+ cm5.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
+
+ // ensure no updates were made
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("1", entry.getValue().toString());
+
+ // set all columns correctly
+ ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1"));
+ cm6.put("name", "last", cva, "Doe");
+ cm6.put("name", "first", cva, "John");
+ cm6.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("2", entry.getValue().toString());
+ }
}
@Test
@@ -327,87 +328,86 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
Authorizations filteredAuths = new Authorizations("A");
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(filteredAuths));
-
ColumnVisibility cva = new ColumnVisibility("A");
ColumnVisibility cvb = new ColumnVisibility("B");
ColumnVisibility cvc = new ColumnVisibility("C");
- // User has authorization, but didn't include it in the writer
- ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb));
- cm0.put("name", "last", cva, "doe");
- cm0.put("name", "first", cva, "john");
- cm0.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm0).getStatus());
-
- ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
- cm1.put("name", "last", cva, "doe");
- cm1.put("name", "first", cva, "john");
- cm1.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm1).getStatus());
-
- // User does not have the authorization
- ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc));
- cm2.put("name", "last", cva, "doe");
- cm2.put("name", "first", cva, "john");
- cm2.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm2).getStatus());
-
- ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc).setValue("1"));
- cm3.put("name", "last", cva, "doe");
- cm3.put("name", "first", cva, "john");
- cm3.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm3).getStatus());
-
- // if any visibility is bad, good visibilities don't override
- ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva));
-
- cm4.put("name", "last", cva, "doe");
- cm4.put("name", "first", cva, "john");
- cm4.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm4).getStatus());
-
- ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), new Condition("tx", "seq")
- .setVisibility(cva).setValue("1"));
- cm5.put("name", "last", cva, "doe");
- cm5.put("name", "first", cva, "john");
- cm5.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm5).getStatus());
-
- ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"),
- new Condition("tx", "seq").setVisibility(cva));
- cm6.put("name", "last", cva, "doe");
- cm6.put("name", "first", cva, "john");
- cm6.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm6).getStatus());
-
- ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
- .setValue("1"));
- cm7.put("name", "last", cva, "doe");
- cm7.put("name", "first", cva, "john");
- cm7.put("tx", "seq", cva, "1");
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
-
- cw.close();
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(filteredAuths))) {
+
+ // User has authorization, but didn't include it in the writer
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb));
+ cm0.put("name", "last", cva, "doe");
+ cm0.put("name", "first", cva, "john");
+ cm0.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm0).getStatus());
+
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
+ cm1.put("name", "last", cva, "doe");
+ cm1.put("name", "first", cva, "john");
+ cm1.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm1).getStatus());
+
+ // User does not have the authorization
+ ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc));
+ cm2.put("name", "last", cva, "doe");
+ cm2.put("name", "first", cva, "john");
+ cm2.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm2).getStatus());
+
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc).setValue("1"));
+ cm3.put("name", "last", cva, "doe");
+ cm3.put("name", "first", cva, "john");
+ cm3.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm3).getStatus());
+
+ // if any visibility is bad, good visibilities don't override
+ ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva));
+
+ cm4.put("name", "last", cva, "doe");
+ cm4.put("name", "first", cva, "john");
+ cm4.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm4).getStatus());
+
+ ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), new Condition("tx", "seq")
+ .setVisibility(cva).setValue("1"));
+ cm5.put("name", "last", cva, "doe");
+ cm5.put("name", "first", cva, "john");
+ cm5.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm5).getStatus());
+
+ ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"),
+ new Condition("tx", "seq").setVisibility(cva));
+ cm6.put("name", "last", cva, "doe");
+ cm6.put("name", "first", cva, "john");
+ cm6.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm6).getStatus());
+
+ ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
+ .setValue("1"));
+ cm7.put("name", "last", cva, "doe");
+ cm7.put("name", "first", cva, "john");
+ cm7.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
+
+ }
// test passing auths that exceed users configured auths
Authorizations exceedingAuths = new Authorizations("A", "B", "D");
- ConditionalWriter cw2 = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(exceedingAuths));
+ try (ConditionalWriter cw2 = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(exceedingAuths))) {
- ConditionalMutation cm8 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
- .setValue("1"));
- cm8.put("name", "last", cva, "doe");
- cm8.put("name", "first", cva, "john");
- cm8.put("tx", "seq", cva, "1");
+ ConditionalMutation cm8 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
+ .setValue("1"));
+ cm8.put("name", "last", cva, "doe");
+ cm8.put("name", "first", cva, "john");
+ cm8.put("tx", "seq", cva, "1");
- try {
- Status status = cw2.write(cm8).getStatus();
- Assert.fail("Writing mutation with Authorizations the user doesn't have should fail. Got status: " + status);
- } catch (AccumuloSecurityException ase) {
- // expected, check specific failure?
- } finally {
- cw2.close();
+ try {
+ Status status = cw2.write(cm8).getStatus();
+ Assert.fail("Writing mutation with Authorizations the user doesn't have should fail. Got status: " + status);
+ } catch (AccumuloSecurityException ase) {
+ // expected, check specific failure?
+ }
}
}
@@ -424,21 +424,20 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
Scanner scanner = conn.createScanner(tableName + "_clone", new Authorizations());
- ConditionalWriter cw = conn.createConditionalWriter(tableName + "_clone", new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName + "_clone", new ConditionalWriterConfig())) {
- ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq"));
- cm0.put("tx", "seq", "1");
+ ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq"));
+ cm0.put("tx", "seq", "1");
- Assert.assertEquals(Status.VIOLATED, cw.write(cm0).getStatus());
- Assert.assertFalse("Should find no results in the table is mutation result was violated", scanner.iterator().hasNext());
-
- ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq"));
- cm1.put("tx", "seq", "1");
+ Assert.assertEquals(Status.VIOLATED, cw.write(cm0).getStatus());
+ Assert.assertFalse("Should find no results in the table is mutation result was violated", scanner.iterator().hasNext());
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
- Assert.assertTrue("Accepted result should be returned when reading table", scanner.iterator().hasNext());
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
- cw.close();
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+ Assert.assertTrue("Accepted result should be returned when reading table", scanner.iterator().hasNext());
+ }
}
@Test
@@ -488,55 +487,55 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
Assert.assertEquals("3", entry.getValue().toString());
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
- ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3"));
- cm0.put("count", "comments", "1");
- Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("3", entry.getValue().toString());
+ ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3"));
+ cm0.put("count", "comments", "1");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("3", entry.getValue().toString());
- ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("3"));
- cm1.put("count", "comments", "1");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("4", entry.getValue().toString());
+ ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("3"));
+ cm1.put("count", "comments", "1");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("4", entry.getValue().toString());
- ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("4"));
- cm2.put("count", "comments", "1");
- Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("4", entry.getValue().toString());
+ ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("4"));
+ cm2.put("count", "comments", "1");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("4", entry.getValue().toString());
- // run test with multiple iterators passed in same batch and condition with two iterators
+ // run test with multiple iterators passed in same batch and condition with two iterators
- ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("4"));
- cm3.put("count", "comments", "1");
+ ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("4"));
+ cm3.put("count", "comments", "1");
- ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count2", "comments").setIterators(iterConfig2).setValue("2"));
- cm4.put("count2", "comments", "1");
+ ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count2", "comments").setIterators(iterConfig2).setValue("2"));
+ cm4.put("count2", "comments", "1");
- ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count2", "comments").setIterators(iterConfig2, iterConfig3).setValue("2"));
- cm5.put("count2", "comments", "1");
+ ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count2", "comments").setIterators(iterConfig2, iterConfig3).setValue(
+ "2"));
+ cm5.put("count2", "comments", "1");
- Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator());
- Map<String,Status> actual = new HashMap<String,Status>();
+ Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator());
+ Map<String,Status> actual = new HashMap<String,Status>();
- while (results.hasNext()) {
- Result result = results.next();
- String k = new String(result.getMutation().getRow());
- Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k));
- actual.put(k, result.getStatus());
- }
-
- Map<String,Status> expected = new HashMap<String,Status>();
- expected.put("ACCUMULO-1000", Status.ACCEPTED);
- expected.put("ACCUMULO-1001", Status.ACCEPTED);
- expected.put("ACCUMULO-1002", Status.REJECTED);
+ while (results.hasNext()) {
+ Result result = results.next();
+ String k = new String(result.getMutation().getRow());
+ Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k));
+ actual.put(k, result.getStatus());
+ }
- Assert.assertEquals(expected, actual);
+ Map<String,Status> expected = new HashMap<String,Status>();
+ expected.put("ACCUMULO-1000", Status.ACCEPTED);
+ expected.put("ACCUMULO-1001", Status.ACCEPTED);
+ expected.put("ACCUMULO-1002", Status.REJECTED);
- cw.close();
+ Assert.assertEquals(expected, actual);
+ }
}
public static class AddingIterator extends WrappingIterator {
@@ -611,62 +610,59 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().offline(tableName, true);
conn.tableOperations().online(tableName, true);
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
-
- ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("8"));
- cm6.put("count", "comments", "7");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
-
- Scanner scanner = conn.createScanner(tableName, new Authorizations());
- scanner.setRange(new Range("ACCUMULO-1000"));
- scanner.fetchColumn(new Text("count"), new Text("comments"));
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
- Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("9", entry.getValue().toString());
+ ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("8"));
+ cm6.put("count", "comments", "7");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
- ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("27"));
- cm7.put("count", "comments", "8");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
+ Scanner scanner = conn.createScanner(tableName, new Authorizations());
+ scanner.setRange(new Range("ACCUMULO-1000"));
+ scanner.fetchColumn(new Text("count"), new Text("comments"));
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("10", entry.getValue().toString());
+ Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("9", entry.getValue().toString());
- ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2, aiConfig3).setValue("35"));
- cm8.put("count", "comments", "9");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus());
+ ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("27"));
+ cm7.put("count", "comments", "8");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("11", entry.getValue().toString());
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("10", entry.getValue().toString());
- ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("33"));
- cm3.put("count", "comments", "3");
+ ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2, aiConfig3).setValue("35"));
+ cm8.put("count", "comments", "9");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus());
- ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count", "comments").setIterators(aiConfig3).setValue("14"));
- cm4.put("count", "comments", "3");
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("11", entry.getValue().toString());
- ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count", "comments").setIterators(aiConfig3).setValue("10"));
- cm5.put("count", "comments", "3");
+ ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("33"));
+ cm3.put("count", "comments", "3");
- Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator());
- Map<String,Status> actual = new HashMap<String,Status>();
+ ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count", "comments").setIterators(aiConfig3).setValue("14"));
+ cm4.put("count", "comments", "3");
- while (results.hasNext()) {
- Result result = results.next();
- String k = new String(result.getMutation().getRow());
- Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k));
- actual.put(k, result.getStatus());
- }
+ ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count", "comments").setIterators(aiConfig3).setValue("10"));
+ cm5.put("count", "comments", "3");
- cw.close();
+ Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator());
+ Map<String,Status> actual = new HashMap<String,Status>();
- Map<String,Status> expected = new HashMap<String,Status>();
- expected.put("ACCUMULO-1000", Status.ACCEPTED);
- expected.put("ACCUMULO-1001", Status.ACCEPTED);
- expected.put("ACCUMULO-1002", Status.REJECTED);
+ while (results.hasNext()) {
+ Result result = results.next();
+ String k = new String(result.getMutation().getRow());
+ Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k));
+ actual.put(k, result.getStatus());
+ }
- Assert.assertEquals(expected, actual);
+ Map<String,Status> expected = new HashMap<String,Status>();
+ expected.put("ACCUMULO-1000", Status.ACCEPTED);
+ expected.put("ACCUMULO-1001", Status.ACCEPTED);
+ expected.put("ACCUMULO-1002", Status.REJECTED);
- cw.close();
+ Assert.assertEquals(expected, actual);
+ }
}
@Test
@@ -701,81 +697,80 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
cm2.put("tx", "seq", cvab, "1");
mutations.add(cm2);
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
- Iterator<Result> results = cw.write(mutations.iterator());
- int count = 0;
- while (results.hasNext()) {
- Result result = results.next();
- Assert.assertEquals(Status.ACCEPTED, result.getStatus());
- count++;
- }
-
- Assert.assertEquals(3, count);
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")))) {
+ Iterator<Result> results = cw.write(mutations.iterator());
+ int count = 0;
+ while (results.hasNext()) {
+ Result result = results.next();
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ count++;
+ }
- Scanner scanner = conn.createScanner(tableName, new Authorizations("A"));
- scanner.fetchColumn(new Text("tx"), new Text("seq"));
+ Assert.assertEquals(3, count);
- for (String row : new String[] {"99006", "59056", "19059"}) {
- scanner.setRange(new Range(row));
- Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("1", entry.getValue().toString());
- }
+ Scanner scanner = conn.createScanner(tableName, new Authorizations("A"));
+ scanner.fetchColumn(new Text("tx"), new Text("seq"));
- TreeSet<Text> splits = new TreeSet<Text>();
- splits.add(new Text("7"));
- splits.add(new Text("3"));
- conn.tableOperations().addSplits(tableName, splits);
+ for (String row : new String[] {"99006", "59056", "19059"}) {
+ scanner.setRange(new Range(row));
+ Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("1", entry.getValue().toString());
+ }
- mutations.clear();
+ TreeSet<Text> splits = new TreeSet<Text>();
+ splits.add(new Text("7"));
+ splits.add(new Text("3"));
+ conn.tableOperations().addSplits(tableName, splits);
+
+ mutations.clear();
+
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab).setValue("1"));
+ cm3.put("name", "last", cvab, "Doe");
+ cm3.put("tx", "seq", cvab, "2");
+ mutations.add(cm3);
+
+ ConditionalMutation cm4 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
+ cm4.put("name", "last", cvab, "Doe");
+ cm4.put("tx", "seq", cvab, "1");
+ mutations.add(cm4);
+
+ ConditionalMutation cm5 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab).setValue("2"));
+ cm5.put("name", "last", cvab, "Doe");
+ cm5.put("tx", "seq", cvab, "3");
+ mutations.add(cm5);
+
+ results = cw.write(mutations.iterator());
+ int accepted = 0;
+ int rejected = 0;
+ while (results.hasNext()) {
+ Result result = results.next();
+ if (new String(result.getMutation().getRow()).equals("99006")) {
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ accepted++;
+ } else {
+ Assert.assertEquals(Status.REJECTED, result.getStatus());
+ rejected++;
+ }
+ }
- ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab).setValue("1"));
- cm3.put("name", "last", cvab, "Doe");
- cm3.put("tx", "seq", cvab, "2");
- mutations.add(cm3);
+ Assert.assertEquals("Expected only one accepted conditional mutation", 1, accepted);
+ Assert.assertEquals("Expected two rejected conditional mutations", 2, rejected);
- ConditionalMutation cm4 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
- cm4.put("name", "last", cvab, "Doe");
- cm4.put("tx", "seq", cvab, "1");
- mutations.add(cm4);
-
- ConditionalMutation cm5 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab).setValue("2"));
- cm5.put("name", "last", cvab, "Doe");
- cm5.put("tx", "seq", cvab, "3");
- mutations.add(cm5);
-
- results = cw.write(mutations.iterator());
- int accepted = 0;
- int rejected = 0;
- while (results.hasNext()) {
- Result result = results.next();
- if (new String(result.getMutation().getRow()).equals("99006")) {
- Assert.assertEquals(Status.ACCEPTED, result.getStatus());
- accepted++;
- } else {
- Assert.assertEquals(Status.REJECTED, result.getStatus());
- rejected++;
+ for (String row : new String[] {"59056", "19059"}) {
+ scanner.setRange(new Range(row));
+ Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("1", entry.getValue().toString());
}
- }
- Assert.assertEquals("Expected only one accepted conditional mutation", 1, accepted);
- Assert.assertEquals("Expected two rejected conditional mutations", 2, rejected);
-
- for (String row : new String[] {"59056", "19059"}) {
- scanner.setRange(new Range(row));
+ scanner.setRange(new Range("99006"));
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("1", entry.getValue().toString());
- }
-
- scanner.setRange(new Range("99006"));
- Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("2", entry.getValue().toString());
-
- scanner.clearColumns();
- scanner.fetchColumn(new Text("name"), new Text("last"));
- entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("Doe", entry.getValue().toString());
+ Assert.assertEquals("2", entry.getValue().toString());
- cw.close();
+ scanner.clearColumns();
+ scanner.fetchColumn(new Text("name"), new Text("last"));
+ entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("Doe", entry.getValue().toString());
+ }
}
@Test
@@ -810,45 +805,44 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
cml.add(cm);
}
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
- Iterator<Result> results = cw.write(cml.iterator());
+ Iterator<Result> results = cw.write(cml.iterator());
- int count = 0;
+ int count = 0;
- // TODO check got each row back
- while (results.hasNext()) {
- Result result = results.next();
- Assert.assertEquals(Status.ACCEPTED, result.getStatus());
- count++;
- }
+ // TODO check got each row back
+ while (results.hasNext()) {
+ Result result = results.next();
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ count++;
+ }
- Assert.assertEquals("Did not receive the expected number of results", num, count);
+ Assert.assertEquals("Did not receive the expected number of results", num, count);
- ArrayList<ConditionalMutation> cml2 = new ArrayList<ConditionalMutation>(num);
+ ArrayList<ConditionalMutation> cml2 = new ArrayList<ConditionalMutation>(num);
- for (int i = 0; i < num; i++) {
- ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq").setValue("1"));
+ for (int i = 0; i < num; i++) {
+ ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq").setValue("1"));
- cm.put("meta", "seq", "2");
- cm.put("meta", "tx", UUID.randomUUID().toString());
-
- cml2.add(cm);
- }
+ cm.put("meta", "seq", "2");
+ cm.put("meta", "tx", UUID.randomUUID().toString());
- count = 0;
+ cml2.add(cm);
+ }
- results = cw.write(cml2.iterator());
+ count = 0;
- while (results.hasNext()) {
- Result result = results.next();
- Assert.assertEquals(Status.ACCEPTED, result.getStatus());
- count++;
- }
+ results = cw.write(cml2.iterator());
- Assert.assertEquals("Did not receive the expected number of results", num, count);
+ while (results.hasNext()) {
+ Result result = results.next();
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ count++;
+ }
- cw.close();
+ Assert.assertEquals("Did not receive the expected number of results", num, count);
+ }
}
@Test
@@ -901,33 +895,32 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
cm3.put("tx", "seq", cvaob, "2");
mutations.add(cm3);
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
- Iterator<Result> results = cw.write(mutations.iterator());
- HashSet<String> rows = new HashSet<String>();
- while (results.hasNext()) {
- Result result = results.next();
- String row = new String(result.getMutation().getRow());
- if (row.equals("19059")) {
- Assert.assertEquals(Status.ACCEPTED, result.getStatus());
- } else if (row.equals("59056")) {
- Assert.assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus());
- } else if (row.equals("99006")) {
- Assert.assertEquals(Status.VIOLATED, result.getStatus());
- } else if (row.equals("90909")) {
- Assert.assertEquals(Status.REJECTED, result.getStatus());
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")))) {
+ Iterator<Result> results = cw.write(mutations.iterator());
+ HashSet<String> rows = new HashSet<String>();
+ while (results.hasNext()) {
+ Result result = results.next();
+ String row = new String(result.getMutation().getRow());
+ if (row.equals("19059")) {
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ } else if (row.equals("59056")) {
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus());
+ } else if (row.equals("99006")) {
+ Assert.assertEquals(Status.VIOLATED, result.getStatus());
+ } else if (row.equals("90909")) {
+ Assert.assertEquals(Status.REJECTED, result.getStatus());
+ }
+ rows.add(row);
}
- rows.add(row);
- }
- Assert.assertEquals(4, rows.size());
+ Assert.assertEquals(4, rows.size());
- Scanner scanner = conn.createScanner(tableName, new Authorizations("A"));
- scanner.fetchColumn(new Text("tx"), new Text("seq"));
-
- Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
- Assert.assertEquals("1", entry.getValue().toString());
+ Scanner scanner = conn.createScanner(tableName, new Authorizations("A"));
+ scanner.fetchColumn(new Text("tx"), new Text("seq"));
- cw.close();
+ Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+ Assert.assertEquals("1", entry.getValue().toString());
+ }
}
@Test
@@ -939,46 +932,45 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(tableName);
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
- ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
- cm1.put("tx", "seq", "1");
- cm1.put("data", "x", "a");
-
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+ ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
- ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
- cm2.put("tx", "seq", "2");
- cm2.put("data", "x", "b");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
- ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
- cm3.put("tx", "seq", "2");
- cm3.put("data", "x", "c");
+ ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm2.put("tx", "seq", "2");
+ cm2.put("data", "x", "b");
- ConditionalMutation cm4 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
- cm4.put("tx", "seq", "2");
- cm4.put("data", "x", "d");
+ ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm3.put("tx", "seq", "2");
+ cm3.put("data", "x", "c");
- Iterator<Result> results = cw.write(Arrays.asList(cm2, cm3, cm4).iterator());
+ ConditionalMutation cm4 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm4.put("tx", "seq", "2");
+ cm4.put("data", "x", "d");
- int accepted = 0;
- int rejected = 0;
- int total = 0;
+ Iterator<Result> results = cw.write(Arrays.asList(cm2, cm3, cm4).iterator());
- while (results.hasNext()) {
- Status status = results.next().getStatus();
- if (status == Status.ACCEPTED)
- accepted++;
- if (status == Status.REJECTED)
- rejected++;
- total++;
- }
+ int accepted = 0;
+ int rejected = 0;
+ int total = 0;
- Assert.assertEquals("Expected one accepted result", 1, accepted);
- Assert.assertEquals("Expected two rejected results", 2, rejected);
- Assert.assertEquals("Expected three total results", 3, total);
+ while (results.hasNext()) {
+ Status status = results.next().getStatus();
+ if (status == Status.ACCEPTED)
+ accepted++;
+ if (status == Status.REJECTED)
+ rejected++;
+ total++;
+ }
- cw.close();
+ Assert.assertEquals("Expected one accepted result", 1, accepted);
+ Assert.assertEquals("Expected two rejected results", 2, rejected);
+ Assert.assertEquals("Expected three total results", 3, total);
+ }
}
private static class Stats {
@@ -1073,11 +1065,9 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
@Override
public void run() {
- try {
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY))) {
Random rand = new Random();
- Scanner scanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY));
-
for (int i = 0; i < 20; i++) {
int numRows = rand.nextInt(10) + 1;
@@ -1105,9 +1095,7 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
Collections.sort(changed);
Assert.assertEquals(changes, changed);
-
}
-
} catch (Exception e) {
log.error("{}", e.getMessage(), e);
failed.set(true);
@@ -1135,54 +1123,56 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
break;
}
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
- ArrayList<ByteSequence> rows = new ArrayList<ByteSequence>();
+ ArrayList<ByteSequence> rows = new ArrayList<ByteSequence>();
- for (int i = 0; i < 1000; i++) {
- rows.add(new ArrayByteSequence(FastFormat.toZeroPaddedString(abs(rand.nextLong()), 16, 16, new byte[0])));
- }
+ for (int i = 0; i < 1000; i++) {
+ rows.add(new ArrayByteSequence(FastFormat.toZeroPaddedString(abs(rand.nextLong()), 16, 16, new byte[0])));
+ }
- ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
+ ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
- for (ByteSequence row : rows)
- mutations.add(new Stats(row).toMutation());
+ for (ByteSequence row : rows)
+ mutations.add(new Stats(row).toMutation());
- ArrayList<ByteSequence> rows2 = new ArrayList<ByteSequence>();
- Iterator<Result> results = cw.write(mutations.iterator());
- while (results.hasNext()) {
- Result result = results.next();
- Assert.assertEquals(Status.ACCEPTED, result.getStatus());
- rows2.add(new ArrayByteSequence(result.getMutation().getRow()));
- }
+ ArrayList<ByteSequence> rows2 = new ArrayList<ByteSequence>();
+ Iterator<Result> results = cw.write(mutations.iterator());
+ while (results.hasNext()) {
+ Result result = results.next();
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ rows2.add(new ArrayByteSequence(result.getMutation().getRow()));
+ }
- Collections.sort(rows);
- Collections.sort(rows2);
+ Collections.sort(rows);
+ Collections.sort(rows2);
- Assert.assertEquals(rows, rows2);
+ Assert.assertEquals(rows, rows2);
- AtomicBoolean failed = new AtomicBoolean(false);
+ AtomicBoolean failed = new AtomicBoolean(false);
- ExecutorService tp = Executors.newFixedThreadPool(5);
- for (int i = 0; i < 5; i++) {
- tp.submit(new MutatorTask(tableName, conn, rows, cw, failed));
- }
+ ExecutorService tp = Executors.newFixedThreadPool(5);
+ for (int i = 0; i < 5; i++) {
+ tp.submit(new MutatorTask(tableName, conn, rows, cw, failed));
+ }
- tp.shutdown();
+ tp.shutdown();
- while (!tp.isTerminated()) {
- tp.awaitTermination(1, TimeUnit.MINUTES);
- }
+ while (!tp.isTerminated()) {
+ tp.awaitTermination(1, TimeUnit.MINUTES);
+ }
- Assert.assertFalse("A MutatorTask failed with an exception", failed.get());
+ Assert.assertFalse("A MutatorTask failed with an exception", failed.get());
+ }
- Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ try (Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY)) {
- RowIterator rowIter = new RowIterator(scanner);
+ RowIterator rowIter = new RowIterator(scanner);
- while (rowIter.hasNext()) {
- Iterator<Entry<Key,Value>> row = rowIter.next();
- new Stats(row);
+ while (rowIter.hasNext()) {
+ Iterator<Entry<Key,Value>> row = rowIter.next();
+ new Stats(row);
+ }
}
}
@@ -1232,27 +1222,28 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
- ConditionalWriter cw1 = conn2.createConditionalWriter(table1, new ConditionalWriterConfig());
- ConditionalWriter cw2 = conn2.createConditionalWriter(table2, new ConditionalWriterConfig());
- ConditionalWriter cw3 = conn2.createConditionalWriter(table3, new ConditionalWriterConfig());
+ try (ConditionalWriter cw1 = conn2.createConditionalWriter(table1, new ConditionalWriterConfig());
+ ConditionalWriter cw2 = conn2.createConditionalWriter(table2, new ConditionalWriterConfig());
+ ConditionalWriter cw3 = conn2.createConditionalWriter(table3, new ConditionalWriterConfig())) {
- // Should be able to conditional-update a table we have R/W on
- Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus());
+ // Should be able to conditional-update a table we have R/W on
+ Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus());
- // Conditional-update to a table we only have read on should fail
- try {
- Status status = cw1.write(cm1).getStatus();
- Assert.fail("Expected exception writing conditional mutation to table the user doesn't have write access to, Got status: " + status);
- } catch (AccumuloSecurityException ase) {
+ // Conditional-update to a table we only have read on should fail
+ try {
+ Status status = cw1.write(cm1).getStatus();
+ Assert.fail("Expected exception writing conditional mutation to table the user doesn't have write access to, Got status: " + status);
+ } catch (AccumuloSecurityException ase) {
- }
+ }
- // Conditional-update to a table we only have writer on should fail
- try {
- Status status = cw2.write(cm1).getStatus();
- Assert.fail("Expected exception writing conditional mutation to table the user doesn't have read access to. Got status: " + status);
- } catch (AccumuloSecurityException ase) {
+ // Conditional-update to a table we only have writer on should fail
+ try {
+ Status status = cw2.write(cm1).getStatus();
+ Assert.fail("Expected exception writing conditional mutation to table the user doesn't have read access to. Got status: " + status);
+ } catch (AccumuloSecurityException ase) {
+ }
}
}
@@ -1264,45 +1255,44 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setTimeout(3, TimeUnit.SECONDS));
-
- ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
- cm1.put("tx", "seq", "1");
- cm1.put("data", "x", "a");
-
- Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED);
+ try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setTimeout(3, TimeUnit.SECONDS))) {
- IteratorSetting is = new IteratorSetting(5, SlowIterator.class);
- SlowIterator.setSeekSleepTime(is, 5000);
+ ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
- ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1").setIterators(is));
- cm2.put("tx", "seq", "2");
- cm2.put("data", "x", "b");
+ Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED);
- Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN);
+ IteratorSetting is = new IteratorSetting(5, SlowIterator.class);
+ SlowIterator.setSeekSleepTime(is, 5000);
- Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+ ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1").setIterators(is));
+ cm2.put("tx", "seq", "2");
+ cm2.put("data", "x", "b");
- for (Entry<Key,Value> entry : scanner) {
- String cf = entry.getKey().getColumnFamilyData().toString();
- String cq = entry.getKey().getColumnQualifierData().toString();
- String val = entry.getValue().toString();
+ Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN);
- if (cf.equals("tx") && cq.equals("seq"))
- Assert.assertEquals("Unexpected value in tx:seq", "1", val);
- else if (cf.equals("data") && cq.equals("x"))
- Assert.assertEquals("Unexpected value in data:x", "a", val);
- else
- Assert.fail("Saw unexpected column family and qualifier: " + entry);
- }
+ Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
- ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
- cm3.put("tx", "seq", "2");
- cm3.put("data", "x", "b");
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+ String val = entry.getValue().toString();
+
+ if (cf.equals("tx") && cq.equals("seq"))
+ Assert.assertEquals("Unexpected value in tx:seq", "1", val);
+ else if (cf.equals("data") && cq.equals("x"))
+ Assert.assertEquals("Unexpected value in data:x", "a", val);
+ else
+ Assert.fail("Saw unexpected column family and qualifier: " + entry);
+ }
- Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED);
+ ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm3.put("tx", "seq", "2");
+ cm3.put("data", "x", "b");
- cw.close();
+ Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED);
+ }
}
@Test
@@ -1317,21 +1307,22 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) {
- conn.tableOperations().delete(table);
+ conn.tableOperations().delete(table);
- ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
- cm1.put("tx", "seq", "1");
- cm1.put("data", "x", "a");
+ ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
- Result result = cw.write(cm1);
+ Result result = cw.write(cm1);
- try {
- Status status = result.getStatus();
- Assert.fail("Expected exception writing conditional mutation to deleted table. Got status: " + status);
- } catch (AccumuloException ae) {
- Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass());
+ try {
+ Status status = result.getStatus();
+ Assert.fail("Expected exception writing conditional mutation to deleted table. Got status: " + status);
+ } catch (AccumuloException ae) {
+ Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass());
+ }
}
}
@@ -1342,29 +1333,28 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) {
- conn.tableOperations().offline(table, true);
+ conn.tableOperations().offline(table, true);
- ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
- cm1.put("tx", "seq", "1");
- cm1.put("data", "x", "a");
+ ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
- Result result = cw.write(cm1);
-
- try {
- Status status = result.getStatus();
- Assert.fail("Expected exception writing conditional mutation to offline table. Got status: " + status);
- } catch (AccumuloException ae) {
- Assert.assertEquals(TableOfflineException.class, ae.getCause().getClass());
- }
+ Result result = cw.write(cm1);
- cw.close();
+ try {
+ Status status = result.getStatus();
+ Assert.fail("Expected exception writing conditional mutation to offline table. Got status: " + status);
+ } catch (AccumuloException ae) {
+ Assert.assertEquals(TableOfflineException.class, ae.getCause().getClass());
+ }
- try {
- conn.createConditionalWriter(table, new ConditionalWriterConfig());
- Assert.fail("Expected exception creating conditional writer to offline table");
- } catch (TableOfflineException e) {}
+ try {
+ conn.createConditionalWriter(table, new ConditionalWriterConfig());
+ Assert.fail("Expected exception creating conditional writer to offline table");
+ } catch (TableOfflineException e) {}
+ }
}
@Test
@@ -1374,24 +1364,24 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) {
- IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);
+ IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);
- ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq").setIterators(iterSetting));
- cm1.put("tx", "seq", "1");
- cm1.put("data", "x", "a");
+ ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq").setIterators(iterSetting));
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
- Result result = cw.write(cm1);
+ Result result = cw.write(cm1);
- try {
- Status status = result.getStatus();
- Assert.fail("Expected exception using iterator which throws an error, Got status: " + status);
- } catch (AccumuloException ae) {
+ try {
+ Status status = result.getStatus();
+ Assert.fail("Expected exception using iterator which throws an error, Got status: " + status);
+ } catch (AccumuloException ae) {
- }
+ }
- cw.close();
+ }
}
@Test(expected = IllegalArgumentException.class)
@@ -1401,13 +1391,14 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
+ try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) {
- ConditionalMutation cm1 = new ConditionalMutation("r1");
- cm1.put("tx", "seq", "1");
- cm1.put("data", "x", "a");
+ ConditionalMutation cm1 = new ConditionalMutation("r1");
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
- cw.write(cm1);
+ cw.write(cm1);
+ }
}
@Test
@@ -1431,15 +1422,16 @@ public class ConditionalWriterIT extends AccumuloClusterHarness {
DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig());
sleepUninterruptibly(1, TimeUnit.SECONDS);
Span root = Trace.on("traceTest");
- ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
-
- // mutation conditional on column tx:seq not exiting
- ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
- cm0.put("name", "last", "doe");
- cm0.put("name", "first", "john");
- cm0.put("tx", "seq", "1");
- Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
- root.stop();
+ try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
+
+ // mutation conditional on column tx:seq not exiting
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
+ cm0.put("name", "last", "doe");
+ cm0.put("name", "first", "john");
+ cm0.put("tx", "seq", "1");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+ root.stop();
+ }
final Scanner scanner = conn.createScanner("trace", Authorizations.EMPTY);
scanner.setRange(new Range(new Text(Long.toHexString(root.traceId()))));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 5aa0c84..967ac24 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -44,8 +44,8 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
@@ -76,30 +76,29 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness {
private void runLatencyTest(String tableName) throws Exception {
// should automatically flush after 2 seconds
- BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS));
- Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+ try (BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS))) {
+ Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
- Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
- m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8)));
- bw.addMutation(m);
+ Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
+ m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8)));
+ bw.addMutation(m);
- sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- int count = Iterators.size(scanner.iterator());
+ int count = Iterators.size(scanner.iterator());
- if (count != 0) {
- throw new Exception("Flushed too soon");
- }
+ if (count != 0) {
+ throw new Exception("Flushed too soon");
+ }
- sleepUninterruptibly(1500, TimeUnit.MILLISECONDS);
+ sleepUninterruptibly(1500, TimeUnit.MILLISECONDS);
- count = Iterators.size(scanner.iterator());
+ count = Iterators.size(scanner.iterator());
- if (count != 1) {
- throw new Exception("Did not flush");
+ if (count != 1) {
+ throw new Exception("Did not flush");
+ }
}
-
- bw.close();
}
private void runFlushTest(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index edf73eb..8b091ca 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -405,40 +405,40 @@ public class ReadWriteIT extends AccumuloClusterHarness {
ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName);
verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName);
connector.tableOperations().flush(tableName, null, null, true);
- BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1);
- String tableId = connector.tableOperations().tableIdMap().get(tableName);
- bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<"))));
- bscanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- boolean foundFile = false;
- for (Entry<Key,Value> entry : bscanner) {
- foundFile = true;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream newOut = new PrintStream(baos);
- PrintStream oldOut = System.out;
- try {
- System.setOut(newOut);
- List<String> args = new ArrayList<>();
- args.add(entry.getKey().getColumnQualifier().toString());
- if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
- args.add("--config");
- StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster;
- String hadoopConfDir = sac.getHadoopConfDir();
- args.add(new Path(hadoopConfDir, "core-site.xml").toString());
- args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString());
+ try (BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1)) {
+ String tableId = connector.tableOperations().tableIdMap().get(tableName);
+ bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<"))));
+ bscanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ boolean foundFile = false;
+ for (Entry<Key,Value> entry : bscanner) {
+ foundFile = true;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream newOut = new PrintStream(baos);
+ PrintStream oldOut = System.out;
+ try {
+ System.setOut(newOut);
+ List<String> args = new ArrayList<>();
+ args.add(entry.getKey().getColumnQualifier().toString());
+ if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ args.add("--config");
+ StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster;
+ String hadoopConfDir = sac.getHadoopConfDir();
+ args.add(new Path(hadoopConfDir, "core-site.xml").toString());
+ args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString());
+ }
+ log.info("Invoking PrintInfo with " + args);
+ PrintInfo.main(args.toArray(new String[args.size()]));
+ newOut.flush();
+ String stdout = baos.toString();
+ assertTrue(stdout.contains("Locality group : g1"));
+ assertTrue(stdout.contains("families : [colf]"));
+ } finally {
+ newOut.close();
+ System.setOut(oldOut);
}
- log.info("Invoking PrintInfo with " + args);
- PrintInfo.main(args.toArray(new String[args.size()]));
- newOut.flush();
- String stdout = baos.toString();
- assertTrue(stdout.contains("Locality group : g1"));
- assertTrue(stdout.contains("families : [colf]"));
- } finally {
- newOut.close();
- System.setOut(oldOut);
}
+ assertTrue(foundFile);
}
- bscanner.close();
- assertTrue(foundFile);
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index c48b1ed..80e5374 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -208,47 +208,49 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
private void ensureTabletHasNoUnexpectedMetadataEntries(AccumuloServerContext context, KeyExtent extent, SortedMap<FileRef,DataFileValue> expectedMapFiles)
throws Exception {
- Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- scanner.setRange(extent.toMetadataRange());
-
- HashSet<ColumnFQ> expectedColumns = new HashSet<ColumnFQ>();
- expectedColumns.add(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN);
- expectedColumns.add(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN);
- expectedColumns.add(TabletsSection.ServerColumnFamily.TIME_COLUMN);
- expectedColumns.add(TabletsSection.ServerColumnFamily.LOCK_COLUMN);
-
- HashSet<Text> expectedColumnFamilies = new HashSet<Text>();
- expectedColumnFamilies.add(DataFileColumnFamily.NAME);
- expectedColumnFamilies.add(TabletsSection.FutureLocationColumnFamily.NAME);
- expectedColumnFamilies.add(TabletsSection.CurrentLocationColumnFamily.NAME);
- expectedColumnFamilies.add(TabletsSection.LastLocationColumnFamily.NAME);
- expectedColumnFamilies.add(TabletsSection.BulkFileColumnFamily.NAME);
-
- Iterator<Entry<Key,Value>> iter = scanner.iterator();
- while (iter.hasNext()) {
- Key key = iter.next().getKey();
-
- if (!key.getRow().equals(extent.getMetadataEntry())) {
- throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key);
- }
+ try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ scanner.setRange(extent.toMetadataRange());
+
+ HashSet<ColumnFQ> expectedColumns = new HashSet<ColumnFQ>();
+ expectedColumns.add(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN);
+ expectedColumns.add(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN);
+ expectedColumns.add(TabletsSection.ServerColumnFamily.TIME_COLUMN);
+ expectedColumns.add(TabletsSection.ServerColumnFamily.LOCK_COLUMN);
+
+ HashSet<Text> expectedColumnFamilies = new HashSet<Text>();
+ expectedColumnFamilies.add(DataFileColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.FutureLocationColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.CurrentLocationColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.LastLocationColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.BulkFileColumnFamily.NAME);
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ Key key = iter.next().getKey();
+
+ if (!key.getRow().equals(extent.getMetadataEntry())) {
+ throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key);
+ }
+
+ if (expectedColumnFamilies.contains(key.getColumnFamily())) {
+ continue;
+ }
+
+ if (expectedColumns.remove(new ColumnFQ(key))) {
+ continue;
+ }
- if (expectedColumnFamilies.contains(key.getColumnFamily())) {
- continue;
+ throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key);
}
- if (expectedColumns.remove(new ColumnFQ(key))) {
- continue;
+ System.out.println("expectedColumns " + expectedColumns);
+ if (expectedColumns.size() > 1 || (expectedColumns.size() == 1)) {
+ throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns);
}
- throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key);
+ SortedMap<FileRef,DataFileValue> fixedMapFiles = MetadataTableUtil.getDataFileSizes(extent, context);
+ verifySame(expectedMapFiles, fixedMapFiles);
}
- System.out.println("expectedColumns " + expectedColumns);
- if (expectedColumns.size() > 1 || (expectedColumns.size() == 1)) {
- throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns);
- }
-
- SortedMap<FileRef,DataFileValue> fixedMapFiles = MetadataTableUtil.getDataFileSizes(extent, context);
- verifySame(expectedMapFiles, fixedMapFiles);
}
private void verifySame(SortedMap<FileRef,DataFileValue> datafileSizes, SortedMap<FileRef,DataFileValue> fixedDatafileSizes) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
index 39ef3d8..0c7cfb6 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
@@ -38,19 +38,19 @@ public class ConsistencyCheck extends SelectiveBulkTest {
log.info("Checking " + row);
String user = env.getConnector().whoami();
Authorizations auths = env.getConnector().securityOperations().getUserAuthorizations(user);
- Scanner scanner = env.getConnector().createScanner(Setup.getTableName(), auths);
- scanner = new IsolatedScanner(scanner);
- scanner.setRange(new Range(row));
- scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
- Value v = null;
- Key first = null;
- for (Entry<Key,Value> entry : scanner) {
- if (v == null) {
- v = entry.getValue();
- first = entry.getKey();
+ try (Scanner scanner = new IsolatedScanner(env.getConnector().createScanner(Setup.getTableName(), auths))) {
+ scanner.setRange(new Range(row));
+ scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
+ Value v = null;
+ Key first = null;
+ for (Entry<Key,Value> entry : scanner) {
+ if (v == null) {
+ v = entry.getValue();
+ first = entry.getKey();
+ }
+ if (!v.equals(entry.getValue()))
+ throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first);
}
- if (!v.equals(entry.getValue()))
- throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
index 35636e4..2f3aacd 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
@@ -80,56 +80,56 @@ public class Transfer extends Test {
}
// TODO document how data should be read when using ConditionalWriter
- Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY));
-
- scanner.setRange(new Range(bank));
- scanner.fetchColumnFamily(new Text(acct1));
- scanner.fetchColumnFamily(new Text(acct2));
-
- Account a1 = new Account();
- Account a2 = new Account();
- Account a;
-
- for (Entry<Key,Value> entry : scanner) {
- String cf = entry.getKey().getColumnFamilyData().toString();
- String cq = entry.getKey().getColumnQualifierData().toString();
-
- if (cf.equals(acct1))
- a = a1;
- else if (cf.equals(acct2))
- a = a2;
- else
- throw new Exception("Unexpected column fam: " + cf);
-
- if (cq.equals("bal"))
- a.setBal(entry.getValue().toString());
- else if (cq.equals("seq"))
- a.setSeq(entry.getValue().toString());
- else
- throw new Exception("Unexpected column qual: " + cq);
- }
-
- int amt = rand.nextInt(50);
-
- log.debug("transfer req " + bank + " " + amt + " " + acct1 + " " + a1 + " " + acct2 + " " + a2);
-
- if (a1.bal >= amt) {
- ConditionalMutation cm = new ConditionalMutation(bank, new Condition(acct1, "seq").setValue(Utils.getSeq(a1.seq)),
- new Condition(acct2, "seq").setValue(Utils.getSeq(a2.seq)));
- cm.put(acct1, "bal", (a1.bal - amt) + "");
- cm.put(acct2, "bal", (a2.bal + amt) + "");
- cm.put(acct1, "seq", Utils.getSeq(a1.seq + 1));
- cm.put(acct2, "seq", Utils.getSeq(a2.seq + 1));
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) {
+
+ scanner.setRange(new Range(bank));
+ scanner.fetchColumnFamily(new Text(acct1));
+ scanner.fetchColumnFamily(new Text(acct2));
+
+ Account a1 = new Account();
+ Account a2 = new Account();
+ Account a;
+
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+
+ if (cf.equals(acct1))
+ a = a1;
+ else if (cf.equals(acct2))
+ a = a2;
+ else
+ throw new Exception("Unexpected column fam: " + cf);
+
+ if (cq.equals("bal"))
+ a.setBal(entry.getValue().toString());
+ else if (cq.equals("seq"))
+ a.setSeq(entry.getValue().toString());
+ else
+ throw new Exception("Unexpected column qual: " + cq);
+ }
- ConditionalWriter cw = (ConditionalWriter) state.get("cw");
- Status status = cw.write(cm).getStatus();
- while (status == Status.UNKNOWN) {
- log.debug("retrying transfer " + status);
- status = cw.write(cm).getStatus();
+ int amt = rand.nextInt(50);
+
+ log.debug("transfer req " + bank + " " + amt + " " + acct1 + " " + a1 + " " + acct2 + " " + a2);
+
+ if (a1.bal >= amt) {
+ ConditionalMutation cm = new ConditionalMutation(bank, new Condition(acct1, "seq").setValue(Utils.getSeq(a1.seq)),
+ new Condition(acct2, "seq").setValue(Utils.getSeq(a2.seq)));
+ cm.put(acct1, "bal", (a1.bal - amt) + "");
+ cm.put(acct2, "bal", (a2.bal + amt) + "");
+ cm.put(acct1, "seq", Utils.getSeq(a1.seq + 1));
+ cm.put(acct2, "seq", Utils.getSeq(a2.seq + 1));
+
+ ConditionalWriter cw = (ConditionalWriter) state.get("cw");
+ Status status = cw.write(cm).getStatus();
+ while (status == Status.UNKNOWN) {
+ log.debug("retrying transfer " + status);
+ status = cw.write(cm).getStatus();
+ }
+ log.debug("transfer result " + bank + " " + status + " " + a1 + " " + a2);
}
- log.debug("transfer result " + bank + " " + status + " " + a1 + " " + a2);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java
index 2690ffc..6c46f73 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java
@@ -53,27 +53,30 @@ public class Verify extends Test {
private void verifyBank(String table, Connector conn, String row, int numAccts) throws TableNotFoundException, Exception {
log.debug("Verifying bank " + row);
- // TODO do not use IsolatedScanner, just enable isolation on scanner
- Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY));
-
- scanner.setRange(new Range(row));
- IteratorSetting iterConf = new IteratorSetting(100, "cqsl", ColumnSliceFilter.class);
- ColumnSliceFilter.setSlice(iterConf, "bal", true, "bal", true);
- scanner.clearScanIterators();
- scanner.addScanIterator(iterConf);
-
int count = 0;
int sum = 0;
int min = Integer.MAX_VALUE;
int max = Integer.MIN_VALUE;
- for (Entry<Key,Value> entry : scanner) {
- int bal = Integer.parseInt(entry.getValue().toString());
- sum += bal;
- if (bal > max)
- max = bal;
- if (bal < min)
- min = bal;
- count++;
+
+ // TODO do not use IsolatedScanner, just enable isolation on scanner
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) {
+
+ scanner.setRange(new Range(row));
+ IteratorSetting iterConf = new IteratorSetting(100, "cqsl", ColumnSliceFilter.class);
+ ColumnSliceFilter.setSlice(iterConf, "bal", true, "bal", true);
+ scanner.clearScanIterators();
+ scanner.addScanIterator(iterConf);
+
+ for (Entry<Key,Value> entry : scanner) {
+ int bal = Integer.parseInt(entry.getValue().toString());
+ sum += bal;
+ if (bal > max)
+ max = bal;
+ if (bal < min)
+ min = bal;
+ count++;
+ }
+
}
if (count > 0 && sum != numAccts * 100) {
[2/2] accumulo git commit: ACCUMULO-4318 Made writers and scanners
auto closeable
Posted by kt...@apache.org.
ACCUMULO-4318 Made writers and scanners auto closeable
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e67317cb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e67317cb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e67317cb
Branch: refs/heads/1.8
Commit: e67317cb267744cae11872d373223234459600be
Parents: 61a7de4
Author: Keith Turner <ke...@deenlo.com>
Authored: Thu Jun 2 12:26:02 2016 -0400
Committer: Keith Turner <ke...@deenlo.com>
Committed: Thu Jun 2 12:26:02 2016 -0400
----------------------------------------------------------------------
.../accumulo/core/client/BatchWriter.java | 3 +-
.../accumulo/core/client/ConditionalWriter.java | 3 +-
.../accumulo/core/client/ScannerBase.java | 5 +-
.../core/client/impl/ScannerImplTest.java | 4 +
.../impl/TabletServerBatchReaderTest.java | 6 +-
.../examples/simple/reservations/ARS.java | 45 +-
.../server/util/MasterMetadataUtil.java | 57 +-
.../accumulo/server/util/MetadataTableUtil.java | 288 ++---
.../accumulo/gc/SimpleGarbageCollector.java | 4 +-
.../accumulo/master/tableOps/CopyFailed.java | 21 +-
.../apache/accumulo/tserver/TabletServer.java | 14 +-
.../accumulo/test/ConditionalWriterIT.java | 1144 +++++++++---------
.../test/functional/BatchWriterFlushIT.java | 33 +-
.../accumulo/test/functional/ReadWriteIT.java | 62 +-
.../test/functional/SplitRecoveryIT.java | 72 +-
.../test/randomwalk/bulk/ConsistencyCheck.java | 24 +-
.../test/randomwalk/conditional/Transfer.java | 94 +-
.../test/randomwalk/conditional/Verify.java | 37 +-
18 files changed, 959 insertions(+), 957 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
index b4d81aa..95d87c5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Mutation;
* In the event that an MutationsRejectedException exception is thrown by one of the methods on a BatchWriter instance, the user should close the current
* instance and create a new instance. This is a known limitation which will be addressed by ACCUMULO-2990 in the future.
*/
-public interface BatchWriter {
+public interface BatchWriter extends AutoCloseable {
/**
* Queues one mutation to write.
@@ -66,6 +66,7 @@ public interface BatchWriter {
* @throws MutationsRejectedException
* this could be thrown because current or previous mutations failed
*/
+ @Override
void close() throws MutationsRejectedException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 62244e6..d13dc09 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -28,7 +28,7 @@ import org.apache.accumulo.core.data.ConditionalMutation;
*
* @since 1.6.0
*/
-public interface ConditionalWriter {
+public interface ConditionalWriter extends AutoCloseable {
class Result {
private Status status;
@@ -131,5 +131,6 @@ public interface ConditionalWriter {
/**
* release any resources (like threads pools) used by conditional writer
*/
+ @Override
void close();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 354f6f4..2110050 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.Text;
* This class hosts configuration methods that are shared between different types of scanners.
*
*/
-public interface ScannerBase extends Iterable<Entry<Key,Value>> {
+public interface ScannerBase extends Iterable<Entry<Key,Value>>, AutoCloseable {
/**
* Add a server-side scan iterator.
@@ -160,10 +160,11 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>> {
long getTimeout(TimeUnit timeUnit);
/**
- * Closes any underlying connections on the scanner
+ * Closes any underlying connections on the scanner. This may invalidate any iterators derived from the Scanner, causing them to throw exceptions.
*
* @since 1.5.0
*/
+ @Override
void close();
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
index eedc61d..38e3c07 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
@@ -45,12 +45,14 @@ public class ScannerImplTest {
s.setReadaheadThreshold(Long.MAX_VALUE);
Assert.assertEquals(Long.MAX_VALUE, s.getReadaheadThreshold());
+ s.close();
}
@Test(expected = IllegalArgumentException.class)
public void testInValidReadaheadValues() {
Scanner s = new ScannerImpl(context, "foo", Authorizations.EMPTY);
s.setReadaheadThreshold(-1);
+ s.close();
}
@Test
@@ -58,8 +60,10 @@ public class ScannerImplTest {
Authorizations expected = new Authorizations("a,b");
Scanner s = new ScannerImpl(context, "foo", expected);
assertEquals(expected, s.getAuthorizations());
+ s.close();
}
+ @SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void testNullAuthorizationsFails() {
new ScannerImpl(context, "foo", null);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
index b31050a..af4a474 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
@@ -36,10 +36,12 @@ public class TabletServerBatchReaderTest {
@Test
public void testGetAuthorizations() {
Authorizations expected = new Authorizations("a,b");
- BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1);
- assertEquals(expected, s.getAuthorizations());
+ try (BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1)) {
+ assertEquals(expected, s.getAuthorizations());
+ }
}
+ @SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void testNullAuthorizationsFails() {
new TabletServerBatchReader(context, "foo", null, 1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
index b9e1a83..d99f7af 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
@@ -20,8 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
-import jline.console.ConsoleReader;
-
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
@@ -41,6 +39,8 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jline.console.ConsoleReader;
+
/**
* Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also
* supported. In order to keep the example simple, no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at
@@ -88,9 +88,9 @@ public class ARS {
ReservationResult result = ReservationResult.RESERVED;
- ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
-
- try {
+ // it is important to use an isolated scanner so that only whole mutations are seen
+ try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
while (true) {
Status status = cwriter.write(update).getStatus();
switch (status) {
@@ -109,8 +109,6 @@ public class ARS {
// that attempted to make a reservation by putting them later in the list. A more complex solution could involve having independent sub-queues within
// the row that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue.
- // it is important to use an isolated scanner so that only whole mutations are seen
- Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
scanner.setRange(new Range(row));
int seq = -1;
@@ -152,10 +150,7 @@ public class ARS {
else
result = ReservationResult.WAIT_LISTED;
}
- } finally {
- cwriter.close();
}
-
}
public void cancel(String what, String when, String who) throws Exception {
@@ -166,13 +161,10 @@ public class ARS {
// will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED
// when it actually got the reservation.
- ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
-
- try {
+ // its important to use an isolated scanner so that only whole mutations are seen
+ try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
while (true) {
-
- // its important to use an isolated scanner so that only whole mutations are seen
- Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
scanner.setRange(new Range(row));
int seq = -1;
@@ -217,8 +209,6 @@ public class ARS {
}
}
- } finally {
- cwriter.close();
}
}
@@ -226,18 +216,19 @@ public class ARS {
String row = what + ":" + when;
// its important to use an isolated scanner so that only whole mutations are seen
- Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
- scanner.setRange(new Range(row));
- scanner.fetchColumnFamily(new Text("res"));
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+ scanner.setRange(new Range(row));
+ scanner.fetchColumnFamily(new Text("res"));
- List<String> reservations = new ArrayList<String>();
+ List<String> reservations = new ArrayList<String>();
- for (Entry<Key,Value> entry : scanner) {
- String val = entry.getValue().toString();
- reservations.add(val);
- }
+ for (Entry<Key,Value> entry : scanner) {
+ String val = entry.getValue().toString();
+ reservations.add(val);
+ }
- return reservations;
+ return reservations;
+ }
}
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index b9e52e3..5aa61bc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.server.util;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
@@ -61,8 +62,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
/**
*
*/
@@ -151,42 +150,44 @@ public class MasterMetadataUtil {
// check to see if prev tablet exist in metadata tablet
Key prevRowKey = new Key(new Text(KeyExtent.getMetadataEntry(table, metadataPrevEndRow)));
- ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW)));
+ try (ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW)));
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ if (!scanner2.iterator().hasNext()) {
+ log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow);
+ MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock);
+ return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper));
+ } else {
+ log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow);
- VolumeManager fs = VolumeManagerImpl.get();
- if (!scanner2.iterator().hasNext()) {
- log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow);
- MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock);
- return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper));
- } else {
- log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow);
+ List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
- List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
+ SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- Key rowKey = new Key(metadataEntry);
+ try (Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ Key rowKey = new Key(metadataEntry);
- SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- scanner3.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW)));
+ scanner3.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW)));
- for (Entry<Key,Value> entry : scanner3) {
- if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
- origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get()));
+ for (Entry<Key,Value> entry : scanner3) {
+ if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
+ origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get()));
+ }
+ }
}
- }
- MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes,
- highDatafileSizes, highDatafilesToRemove);
+ MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes,
+ highDatafileSizes, highDatafilesToRemove);
- MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock);
+ MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock);
- return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow));
+ return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow));
+ }
}
-
}
private static TServerInstance getTServerInstance(String address, ZooLock zooLock) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 5081a9c..416a296 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -280,24 +280,25 @@ public class MetadataTableUtil {
public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
- Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- Text row = extent.getMetadataEntry();
- VolumeManager fs = VolumeManagerImpl.get();
+ try (Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ Text row = extent.getMetadataEntry();
+ VolumeManager fs = VolumeManagerImpl.get();
- Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text(""));
- endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
+ Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text(""));
+ endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
- mdScanner.setRange(new Range(new Key(row), endKey));
- for (Entry<Key,Value> entry : mdScanner) {
+ mdScanner.setRange(new Range(new Key(row), endKey));
+ for (Entry<Key,Value> entry : mdScanner) {
- if (!entry.getKey().getRow().equals(row))
- break;
- DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(new FileRef(fs, entry.getKey()), dfv);
- }
+ if (!entry.getKey().getRow().equals(row))
+ break;
+ DataFileValue dfv = new DataFileValue(entry.getValue().get());
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
+ }
- return sizes;
+ return sizes;
+ }
}
public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, ClientContext context, ZooLock zooLock) {
@@ -415,60 +416,59 @@ public class MetadataTableUtil {
}
public static void deleteTable(String tableId, boolean insertDeletes, ClientContext context, ZooLock lock) throws AccumuloException, IOException {
- Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000).setMaxLatency(120000l, TimeUnit.MILLISECONDS)
- .setMaxWriteThreads(2));
+ try (Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
+ BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000)
+ .setMaxLatency(120000l, TimeUnit.MILLISECONDS).setMaxWriteThreads(2))) {
- // scan metadata for our table and delete everything we find
- Mutation m = null;
- ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ // scan metadata for our table and delete everything we find
+ Mutation m = null;
+ ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- // insert deletes before deleting data from metadata... this makes the code fault tolerant
- if (insertDeletes) {
+ // insert deletes before deleting data from metadata... this makes the code fault tolerant
+ if (insertDeletes) {
- ms.fetchColumnFamily(DataFileColumnFamily.NAME);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms);
+ ms.fetchColumnFamily(DataFileColumnFamily.NAME);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms);
- for (Entry<Key,Value> cell : ms) {
- Key key = cell.getKey();
+ for (Entry<Key,Value> cell : ms) {
+ Key key = cell.getKey();
- if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- FileRef ref = new FileRef(VolumeManagerImpl.get(), key);
- bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
- }
+ if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ FileRef ref = new FileRef(VolumeManagerImpl.get(), key);
+ bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
+ }
- if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
- bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString()));
+ if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+ bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString()));
+ }
}
- }
- bw.flush();
+ bw.flush();
- ms.clearColumns();
- }
+ ms.clearColumns();
+ }
- for (Entry<Key,Value> cell : ms) {
- Key key = cell.getKey();
+ for (Entry<Key,Value> cell : ms) {
+ Key key = cell.getKey();
- if (m == null) {
- m = new Mutation(key.getRow());
- if (lock != null)
- putLockID(lock, m);
+ if (m == null) {
+ m = new Mutation(key.getRow());
+ if (lock != null)
+ putLockID(lock, m);
+ }
+
+ if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) {
+ bw.addMutation(m);
+ m = new Mutation(key.getRow());
+ if (lock != null)
+ putLockID(lock, m);
+ }
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
}
- if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) {
+ if (m != null)
bw.addMutation(m);
- m = new Mutation(key.getRow());
- if (lock != null)
- putLockID(lock, m);
- }
- m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
}
-
- if (m != null)
- bw.addMutation(m);
-
- bw.close();
}
static String getZookeeperLogLocation() {
@@ -521,23 +521,24 @@ public class MetadataTableUtil {
} else {
String systemTableToCheck = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
- Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.setRange(extent.toMetadataRange());
+ try (Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY)) {
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner.setRange(extent.toMetadataRange());
- for (Entry<Key,Value> entry : scanner) {
- if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) {
- throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry());
- }
+ for (Entry<Key,Value> entry : scanner) {
+ if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) {
+ throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry());
+ }
- if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
- result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue()));
- } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(new FileRef(fs, entry.getKey()), dfv);
- } else {
- throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
+ if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
+ result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue()));
+ } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ DataFileValue dfv = new DataFileValue(entry.getValue().get());
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
+ } else {
+ throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
+ }
}
}
}
@@ -828,58 +829,56 @@ public class MetadataTableUtil {
public static void cloneTable(ClientContext context, String srcTableId, String tableId, VolumeManager volumeManager) throws Exception {
Connector conn = context.getConnector();
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ try (BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
- while (true) {
+ while (true) {
- try {
- initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
+ try {
+ initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
- // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed
+ // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed
- while (true) {
- int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
+ while (true) {
+ int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
- if (rewrites == 0)
- break;
- }
+ if (rewrites == 0)
+ break;
+ }
- bw.flush();
- break;
+ bw.flush();
+ break;
- } catch (TabletIterator.TabletDeletedException tde) {
- // tablets were merged in the src table
- bw.flush();
+ } catch (TabletIterator.TabletDeletedException tde) {
+ // tablets were merged in the src table
+ bw.flush();
- // delete what we have cloned and try again
- deleteTable(tableId, false, context, null);
+ // delete what we have cloned and try again
+ deleteTable(tableId, false, context, null);
- log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again");
+ log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again");
- sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
}
- }
- // delete the clone markers and create directory entries
- Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
+ // delete the clone markers and create directory entries
+ Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
- int dirCount = 0;
+ int dirCount = 0;
- for (Entry<Key,Value> entry : mscanner) {
- Key k = entry.getKey();
- Mutation m = new Mutation(k.getRow());
- m.putDelete(k.getColumnFamily(), k.getColumnQualifier());
- String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId
- + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES));
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8)));
+ for (Entry<Key,Value> entry : mscanner) {
+ Key k = entry.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.putDelete(k.getColumnFamily(), k.getColumnQualifier());
+ String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId
+ + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES));
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8)));
- bw.addMutation(m);
+ bw.addMutation(m);
+ }
}
-
- bw.close();
-
}
public static void chopped(AccumuloServerContext context, KeyExtent extent, ZooLock zooLock) {
@@ -889,27 +888,26 @@ public class MetadataTableUtil {
}
public static void removeBulkLoadEntries(Connector conn, String tableId, long tid) throws Exception {
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- for (Entry<Key,Value> entry : mscanner) {
- log.debug("Looking at entry " + entry + " with tid " + tid);
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- log.debug("deleting entry " + entry);
- Mutation m = new Mutation(entry.getKey().getRow());
- m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
- bw.addMutation(m);
+ try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
+ mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : mscanner) {
+ log.debug("Looking at entry " + entry + " with tid " + tid);
+ if (Long.parseLong(entry.getValue().toString()) == tid) {
+ log.debug("deleting entry " + entry);
+ Mutation m = new Mutation(entry.getKey().getRow());
+ m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+ bw.addMutation(m);
+ }
}
}
- bw.close();
}
public static List<FileRef> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException {
List<FileRef> result = new ArrayList<FileRef>();
- try {
+ try (Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY))) {
VolumeManager fs = VolumeManagerImpl.get();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY));
mscanner.setRange(extent.toMetadataRange());
mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
for (Entry<Key,Value> entry : mscanner) {
@@ -917,6 +915,7 @@ public class MetadataTableUtil {
result.add(new FileRef(fs, entry.getKey()));
}
}
+
return result;
} catch (TableNotFoundException ex) {
// unlikely
@@ -929,16 +928,17 @@ public class MetadataTableUtil {
Map<Long,List<FileRef>> result = new HashMap<>();
VolumeManager fs = VolumeManagerImpl.get();
- Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY);
- scanner.setRange(new Range(metadataRow));
- scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
- for (Entry<Key,Value> entry : scanner) {
- Long tid = Long.parseLong(entry.getValue().toString());
- List<FileRef> lst = result.get(tid);
- if (lst == null) {
- result.put(tid, lst = new ArrayList<>());
+ try (Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY)) {
+ scanner.setRange(new Range(metadataRow));
+ scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : scanner) {
+ Long tid = Long.parseLong(entry.getValue().toString());
+ List<FileRef> lst = result.get(tid);
+ if (lst == null) {
+ result.put(tid, lst = new ArrayList<>());
+ }
+ lst.add(new FileRef(fs, entry.getKey()));
}
- lst.add(new FileRef(fs, entry.getKey()));
}
return result;
}
@@ -985,14 +985,15 @@ public class MetadataTableUtil {
Range oldDeletesRange = new Range(oldDeletesPrefix, true, "!!~dem", false);
// move old delete markers to new location, to standardize table schema between all metadata tables
- Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY);
- scanner.setRange(oldDeletesRange);
- for (Entry<Key,Value> entry : scanner) {
- String row = entry.getKey().getRow().toString();
- if (row.startsWith(oldDeletesPrefix)) {
- moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix);
- } else {
- break;
+ try (Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY)) {
+ scanner.setRange(oldDeletesRange);
+ for (Entry<Key,Value> entry : scanner) {
+ String row = entry.getKey().getRow().toString();
+ if (row.startsWith(oldDeletesPrefix)) {
+ moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix);
+ } else {
+ break;
+ }
}
}
}
@@ -1002,14 +1003,15 @@ public class MetadataTableUtil {
KeyExtent notMetadata = new KeyExtent("anythingNotMetadata", null, null);
// move delete markers from the normal delete keyspace to the root tablet delete keyspace if the files are for the !METADATA table
- Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- scanner.setRange(MetadataSchema.DeletesSection.getRange());
- for (Entry<Key,Value> entry : scanner) {
- String row = entry.getKey().getRow().toString();
- if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) {
- moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix());
- } else {
- break;
+ try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ scanner.setRange(MetadataSchema.DeletesSection.getRange());
+ for (Entry<Key,Value> entry : scanner) {
+ String row = entry.getKey().getRow().toString();
+ if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) {
+ moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix());
+ } else {
+ break;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 5e8c038..cc43802 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.gc;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -110,7 +112,6 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import com.google.protobuf.InvalidProtocolBufferException;
public class SimpleGarbageCollector extends AccumuloServerContext implements Iface {
@@ -269,6 +270,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
@Override
public Iterator<String> getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ @SuppressWarnings("resource")
IsolatedScanner scanner = new IsolatedScanner(getConnector().createScanner(tableName, Authorizations.EMPTY));
scanner.setRange(MetadataSchema.BlipSection.getRange());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
index 068aa81..5fbf3a0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
@@ -111,16 +111,17 @@ class CopyFailed extends MasterRepo {
// determine which failed files were loaded
Connector conn = master.getConnector();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
- for (Entry<Key,Value> entry : mscanner) {
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- FileRef loadedFile = new FileRef(fs, entry.getKey());
- String absPath = failures.remove(loadedFile);
- if (absPath != null) {
- loadedFailures.put(loadedFile, absPath);
+ try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY))) {
+ mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+
+ for (Entry<Key,Value> entry : mscanner) {
+ if (Long.parseLong(entry.getValue().toString()) == tid) {
+ FileRef loadedFile = new FileRef(fs, entry.getKey());
+ String absPath = failures.remove(loadedFile);
+ if (absPath != null) {
+ loadedFailures.put(loadedFile, absPath);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1523c55..6427b29 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -142,6 +142,8 @@ import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -256,8 +258,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.net.HostAndPort;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
public class TabletServer extends AccumuloServerContext implements Runnable {
@@ -2595,12 +2595,12 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN,
TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN});
- ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY);
- scanner.setRange(extent.toMetadataRange());
-
TreeMap<Key,Value> tkv = new TreeMap<Key,Value>();
- for (Entry<Key,Value> entry : scanner)
- tkv.put(entry.getKey(), entry.getValue());
+ try (ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY)) {
+ scanner.setRange(extent.toMetadataRange());
+ for (Entry<Key,Value> entry : scanner)
+ tkv.put(entry.getKey(), entry.getValue());
+ }
// only populate map after success
if (tabletsKeyValues == null) {