You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/06/02 16:38:48 UTC
[2/3] accumulo git commit: ACCUMULO-4318 Made writers and scanners
auto closeable
ACCUMULO-4318 Made writers and scanners auto closeable
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e67317cb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e67317cb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e67317cb
Branch: refs/heads/master
Commit: e67317cb267744cae11872d373223234459600be
Parents: 61a7de4
Author: Keith Turner <ke...@deenlo.com>
Authored: Thu Jun 2 12:26:02 2016 -0400
Committer: Keith Turner <ke...@deenlo.com>
Committed: Thu Jun 2 12:26:02 2016 -0400
----------------------------------------------------------------------
.../accumulo/core/client/BatchWriter.java | 3 +-
.../accumulo/core/client/ConditionalWriter.java | 3 +-
.../accumulo/core/client/ScannerBase.java | 5 +-
.../core/client/impl/ScannerImplTest.java | 4 +
.../impl/TabletServerBatchReaderTest.java | 6 +-
.../examples/simple/reservations/ARS.java | 45 +-
.../server/util/MasterMetadataUtil.java | 57 +-
.../accumulo/server/util/MetadataTableUtil.java | 288 ++---
.../accumulo/gc/SimpleGarbageCollector.java | 4 +-
.../accumulo/master/tableOps/CopyFailed.java | 21 +-
.../apache/accumulo/tserver/TabletServer.java | 14 +-
.../accumulo/test/ConditionalWriterIT.java | 1144 +++++++++---------
.../test/functional/BatchWriterFlushIT.java | 33 +-
.../accumulo/test/functional/ReadWriteIT.java | 62 +-
.../test/functional/SplitRecoveryIT.java | 72 +-
.../test/randomwalk/bulk/ConsistencyCheck.java | 24 +-
.../test/randomwalk/conditional/Transfer.java | 94 +-
.../test/randomwalk/conditional/Verify.java | 37 +-
18 files changed, 959 insertions(+), 957 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
index b4d81aa..95d87c5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Mutation;
* In the event that an MutationsRejectedException exception is thrown by one of the methods on a BatchWriter instance, the user should close the current
* instance and create a new instance. This is a known limitation which will be addressed by ACCUMULO-2990 in the future.
*/
-public interface BatchWriter {
+public interface BatchWriter extends AutoCloseable {
/**
* Queues one mutation to write.
@@ -66,6 +66,7 @@ public interface BatchWriter {
* @throws MutationsRejectedException
* this could be thrown because current or previous mutations failed
*/
+ @Override
void close() throws MutationsRejectedException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 62244e6..d13dc09 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -28,7 +28,7 @@ import org.apache.accumulo.core.data.ConditionalMutation;
*
* @since 1.6.0
*/
-public interface ConditionalWriter {
+public interface ConditionalWriter extends AutoCloseable {
class Result {
private Status status;
@@ -131,5 +131,6 @@ public interface ConditionalWriter {
/**
* release any resources (like threads pools) used by conditional writer
*/
+ @Override
void close();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 354f6f4..2110050 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.Text;
* This class hosts configuration methods that are shared between different types of scanners.
*
*/
-public interface ScannerBase extends Iterable<Entry<Key,Value>> {
+public interface ScannerBase extends Iterable<Entry<Key,Value>>, AutoCloseable {
/**
* Add a server-side scan iterator.
@@ -160,10 +160,11 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>> {
long getTimeout(TimeUnit timeUnit);
/**
- * Closes any underlying connections on the scanner
+ * Closes any underlying connections on the scanner. This may invalidate any iterators derived from the Scanner, causing them to throw exceptions.
*
* @since 1.5.0
*/
+ @Override
void close();
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
index eedc61d..38e3c07 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
@@ -45,12 +45,14 @@ public class ScannerImplTest {
s.setReadaheadThreshold(Long.MAX_VALUE);
Assert.assertEquals(Long.MAX_VALUE, s.getReadaheadThreshold());
+ s.close();
}
@Test(expected = IllegalArgumentException.class)
public void testInValidReadaheadValues() {
Scanner s = new ScannerImpl(context, "foo", Authorizations.EMPTY);
s.setReadaheadThreshold(-1);
+ s.close();
}
@Test
@@ -58,8 +60,10 @@ public class ScannerImplTest {
Authorizations expected = new Authorizations("a,b");
Scanner s = new ScannerImpl(context, "foo", expected);
assertEquals(expected, s.getAuthorizations());
+ s.close();
}
+ @SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void testNullAuthorizationsFails() {
new ScannerImpl(context, "foo", null);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
index b31050a..af4a474 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java
@@ -36,10 +36,12 @@ public class TabletServerBatchReaderTest {
@Test
public void testGetAuthorizations() {
Authorizations expected = new Authorizations("a,b");
- BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1);
- assertEquals(expected, s.getAuthorizations());
+ try (BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1)) {
+ assertEquals(expected, s.getAuthorizations());
+ }
}
+ @SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void testNullAuthorizationsFails() {
new TabletServerBatchReader(context, "foo", null, 1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
index b9e1a83..d99f7af 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
@@ -20,8 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
-import jline.console.ConsoleReader;
-
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
@@ -41,6 +39,8 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jline.console.ConsoleReader;
+
/**
* Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also
* supported. In order to keep the example simple, no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at
@@ -88,9 +88,9 @@ public class ARS {
ReservationResult result = ReservationResult.RESERVED;
- ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
-
- try {
+ // it is important to use an isolated scanner so that only whole mutations are seen
+ try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
while (true) {
Status status = cwriter.write(update).getStatus();
switch (status) {
@@ -109,8 +109,6 @@ public class ARS {
// that attempted to make a reservation by putting them later in the list. A more complex solution could involve having independent sub-queues within
// the row that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue.
- // it is important to use an isolated scanner so that only whole mutations are seen
- Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
scanner.setRange(new Range(row));
int seq = -1;
@@ -152,10 +150,7 @@ public class ARS {
else
result = ReservationResult.WAIT_LISTED;
}
- } finally {
- cwriter.close();
}
-
}
public void cancel(String what, String when, String who) throws Exception {
@@ -166,13 +161,10 @@ public class ARS {
// will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED
// when it actually got the reservation.
- ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
-
- try {
+ // its important to use an isolated scanner so that only whole mutations are seen
+ try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
while (true) {
-
- // its important to use an isolated scanner so that only whole mutations are seen
- Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
scanner.setRange(new Range(row));
int seq = -1;
@@ -217,8 +209,6 @@ public class ARS {
}
}
- } finally {
- cwriter.close();
}
}
@@ -226,18 +216,19 @@ public class ARS {
String row = what + ":" + when;
// its important to use an isolated scanner so that only whole mutations are seen
- Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
- scanner.setRange(new Range(row));
- scanner.fetchColumnFamily(new Text("res"));
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+ scanner.setRange(new Range(row));
+ scanner.fetchColumnFamily(new Text("res"));
- List<String> reservations = new ArrayList<String>();
+ List<String> reservations = new ArrayList<String>();
- for (Entry<Key,Value> entry : scanner) {
- String val = entry.getValue().toString();
- reservations.add(val);
- }
+ for (Entry<Key,Value> entry : scanner) {
+ String val = entry.getValue().toString();
+ reservations.add(val);
+ }
- return reservations;
+ return reservations;
+ }
}
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index b9e52e3..5aa61bc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.server.util;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
@@ -61,8 +62,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
/**
*
*/
@@ -151,42 +150,44 @@ public class MasterMetadataUtil {
// check to see if prev tablet exist in metadata tablet
Key prevRowKey = new Key(new Text(KeyExtent.getMetadataEntry(table, metadataPrevEndRow)));
- ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW)));
+ try (ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW)));
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ if (!scanner2.iterator().hasNext()) {
+ log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow);
+ MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock);
+ return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper));
+ } else {
+ log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow);
- VolumeManager fs = VolumeManagerImpl.get();
- if (!scanner2.iterator().hasNext()) {
- log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow);
- MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock);
- return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper));
- } else {
- log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow);
+ List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
- List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
+ SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- Key rowKey = new Key(metadataEntry);
+ try (Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ Key rowKey = new Key(metadataEntry);
- SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
- scanner3.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW)));
+ scanner3.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW)));
- for (Entry<Key,Value> entry : scanner3) {
- if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
- origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get()));
+ for (Entry<Key,Value> entry : scanner3) {
+ if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
+ origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get()));
+ }
+ }
}
- }
- MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes,
- highDatafileSizes, highDatafilesToRemove);
+ MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes,
+ highDatafileSizes, highDatafilesToRemove);
- MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock);
+ MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock);
- return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow));
+ return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow));
+ }
}
-
}
private static TServerInstance getTServerInstance(String address, ZooLock zooLock) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 5081a9c..416a296 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -280,24 +280,25 @@ public class MetadataTableUtil {
public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
- Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- Text row = extent.getMetadataEntry();
- VolumeManager fs = VolumeManagerImpl.get();
+ try (Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ Text row = extent.getMetadataEntry();
+ VolumeManager fs = VolumeManagerImpl.get();
- Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text(""));
- endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
+ Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text(""));
+ endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
- mdScanner.setRange(new Range(new Key(row), endKey));
- for (Entry<Key,Value> entry : mdScanner) {
+ mdScanner.setRange(new Range(new Key(row), endKey));
+ for (Entry<Key,Value> entry : mdScanner) {
- if (!entry.getKey().getRow().equals(row))
- break;
- DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(new FileRef(fs, entry.getKey()), dfv);
- }
+ if (!entry.getKey().getRow().equals(row))
+ break;
+ DataFileValue dfv = new DataFileValue(entry.getValue().get());
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
+ }
- return sizes;
+ return sizes;
+ }
}
public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, ClientContext context, ZooLock zooLock) {
@@ -415,60 +416,59 @@ public class MetadataTableUtil {
}
public static void deleteTable(String tableId, boolean insertDeletes, ClientContext context, ZooLock lock) throws AccumuloException, IOException {
- Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000).setMaxLatency(120000l, TimeUnit.MILLISECONDS)
- .setMaxWriteThreads(2));
+ try (Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
+ BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000)
+ .setMaxLatency(120000l, TimeUnit.MILLISECONDS).setMaxWriteThreads(2))) {
- // scan metadata for our table and delete everything we find
- Mutation m = null;
- ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ // scan metadata for our table and delete everything we find
+ Mutation m = null;
+ ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- // insert deletes before deleting data from metadata... this makes the code fault tolerant
- if (insertDeletes) {
+ // insert deletes before deleting data from metadata... this makes the code fault tolerant
+ if (insertDeletes) {
- ms.fetchColumnFamily(DataFileColumnFamily.NAME);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms);
+ ms.fetchColumnFamily(DataFileColumnFamily.NAME);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms);
- for (Entry<Key,Value> cell : ms) {
- Key key = cell.getKey();
+ for (Entry<Key,Value> cell : ms) {
+ Key key = cell.getKey();
- if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- FileRef ref = new FileRef(VolumeManagerImpl.get(), key);
- bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
- }
+ if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ FileRef ref = new FileRef(VolumeManagerImpl.get(), key);
+ bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
+ }
- if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
- bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString()));
+ if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+ bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString()));
+ }
}
- }
- bw.flush();
+ bw.flush();
- ms.clearColumns();
- }
+ ms.clearColumns();
+ }
- for (Entry<Key,Value> cell : ms) {
- Key key = cell.getKey();
+ for (Entry<Key,Value> cell : ms) {
+ Key key = cell.getKey();
- if (m == null) {
- m = new Mutation(key.getRow());
- if (lock != null)
- putLockID(lock, m);
+ if (m == null) {
+ m = new Mutation(key.getRow());
+ if (lock != null)
+ putLockID(lock, m);
+ }
+
+ if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) {
+ bw.addMutation(m);
+ m = new Mutation(key.getRow());
+ if (lock != null)
+ putLockID(lock, m);
+ }
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
}
- if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) {
+ if (m != null)
bw.addMutation(m);
- m = new Mutation(key.getRow());
- if (lock != null)
- putLockID(lock, m);
- }
- m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
}
-
- if (m != null)
- bw.addMutation(m);
-
- bw.close();
}
static String getZookeeperLogLocation() {
@@ -521,23 +521,24 @@ public class MetadataTableUtil {
} else {
String systemTableToCheck = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
- Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.setRange(extent.toMetadataRange());
+ try (Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY)) {
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner.setRange(extent.toMetadataRange());
- for (Entry<Key,Value> entry : scanner) {
- if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) {
- throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry());
- }
+ for (Entry<Key,Value> entry : scanner) {
+ if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) {
+ throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry());
+ }
- if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
- result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue()));
- } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(new FileRef(fs, entry.getKey()), dfv);
- } else {
- throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
+ if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
+ result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue()));
+ } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ DataFileValue dfv = new DataFileValue(entry.getValue().get());
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
+ } else {
+ throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
+ }
}
}
}
@@ -828,58 +829,56 @@ public class MetadataTableUtil {
public static void cloneTable(ClientContext context, String srcTableId, String tableId, VolumeManager volumeManager) throws Exception {
Connector conn = context.getConnector();
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ try (BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
- while (true) {
+ while (true) {
- try {
- initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
+ try {
+ initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
- // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed
+ // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed
- while (true) {
- int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
+ while (true) {
+ int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
- if (rewrites == 0)
- break;
- }
+ if (rewrites == 0)
+ break;
+ }
- bw.flush();
- break;
+ bw.flush();
+ break;
- } catch (TabletIterator.TabletDeletedException tde) {
- // tablets were merged in the src table
- bw.flush();
+ } catch (TabletIterator.TabletDeletedException tde) {
+ // tablets were merged in the src table
+ bw.flush();
- // delete what we have cloned and try again
- deleteTable(tableId, false, context, null);
+ // delete what we have cloned and try again
+ deleteTable(tableId, false, context, null);
- log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again");
+ log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again");
- sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
}
- }
- // delete the clone markers and create directory entries
- Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
+ // delete the clone markers and create directory entries
+ Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
- int dirCount = 0;
+ int dirCount = 0;
- for (Entry<Key,Value> entry : mscanner) {
- Key k = entry.getKey();
- Mutation m = new Mutation(k.getRow());
- m.putDelete(k.getColumnFamily(), k.getColumnQualifier());
- String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId
- + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES));
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8)));
+ for (Entry<Key,Value> entry : mscanner) {
+ Key k = entry.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.putDelete(k.getColumnFamily(), k.getColumnQualifier());
+ String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId
+ + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES));
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8)));
- bw.addMutation(m);
+ bw.addMutation(m);
+ }
}
-
- bw.close();
-
}
public static void chopped(AccumuloServerContext context, KeyExtent extent, ZooLock zooLock) {
@@ -889,27 +888,26 @@ public class MetadataTableUtil {
}
public static void removeBulkLoadEntries(Connector conn, String tableId, long tid) throws Exception {
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- for (Entry<Key,Value> entry : mscanner) {
- log.debug("Looking at entry " + entry + " with tid " + tid);
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- log.debug("deleting entry " + entry);
- Mutation m = new Mutation(entry.getKey().getRow());
- m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
- bw.addMutation(m);
+ try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
+ mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : mscanner) {
+ log.debug("Looking at entry " + entry + " with tid " + tid);
+ if (Long.parseLong(entry.getValue().toString()) == tid) {
+ log.debug("deleting entry " + entry);
+ Mutation m = new Mutation(entry.getKey().getRow());
+ m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+ bw.addMutation(m);
+ }
}
}
- bw.close();
}
public static List<FileRef> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException {
List<FileRef> result = new ArrayList<FileRef>();
- try {
+ try (Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY))) {
VolumeManager fs = VolumeManagerImpl.get();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY));
mscanner.setRange(extent.toMetadataRange());
mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
for (Entry<Key,Value> entry : mscanner) {
@@ -917,6 +915,7 @@ public class MetadataTableUtil {
result.add(new FileRef(fs, entry.getKey()));
}
}
+
return result;
} catch (TableNotFoundException ex) {
// unlikely
@@ -929,16 +928,17 @@ public class MetadataTableUtil {
Map<Long,List<FileRef>> result = new HashMap<>();
VolumeManager fs = VolumeManagerImpl.get();
- Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY);
- scanner.setRange(new Range(metadataRow));
- scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
- for (Entry<Key,Value> entry : scanner) {
- Long tid = Long.parseLong(entry.getValue().toString());
- List<FileRef> lst = result.get(tid);
- if (lst == null) {
- result.put(tid, lst = new ArrayList<>());
+ try (Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY)) {
+ scanner.setRange(new Range(metadataRow));
+ scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : scanner) {
+ Long tid = Long.parseLong(entry.getValue().toString());
+ List<FileRef> lst = result.get(tid);
+ if (lst == null) {
+ result.put(tid, lst = new ArrayList<>());
+ }
+ lst.add(new FileRef(fs, entry.getKey()));
}
- lst.add(new FileRef(fs, entry.getKey()));
}
return result;
}
@@ -985,14 +985,15 @@ public class MetadataTableUtil {
Range oldDeletesRange = new Range(oldDeletesPrefix, true, "!!~dem", false);
// move old delete markers to new location, to standardize table schema between all metadata tables
- Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY);
- scanner.setRange(oldDeletesRange);
- for (Entry<Key,Value> entry : scanner) {
- String row = entry.getKey().getRow().toString();
- if (row.startsWith(oldDeletesPrefix)) {
- moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix);
- } else {
- break;
+ try (Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY)) {
+ scanner.setRange(oldDeletesRange);
+ for (Entry<Key,Value> entry : scanner) {
+ String row = entry.getKey().getRow().toString();
+ if (row.startsWith(oldDeletesPrefix)) {
+ moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix);
+ } else {
+ break;
+ }
}
}
}
@@ -1002,14 +1003,15 @@ public class MetadataTableUtil {
KeyExtent notMetadata = new KeyExtent("anythingNotMetadata", null, null);
// move delete markers from the normal delete keyspace to the root tablet delete keyspace if the files are for the !METADATA table
- Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
- scanner.setRange(MetadataSchema.DeletesSection.getRange());
- for (Entry<Key,Value> entry : scanner) {
- String row = entry.getKey().getRow().toString();
- if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) {
- moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix());
- } else {
- break;
+ try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
+ scanner.setRange(MetadataSchema.DeletesSection.getRange());
+ for (Entry<Key,Value> entry : scanner) {
+ String row = entry.getKey().getRow().toString();
+ if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) {
+ moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix());
+ } else {
+ break;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 5e8c038..cc43802 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.gc;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -110,7 +112,6 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import com.google.protobuf.InvalidProtocolBufferException;
public class SimpleGarbageCollector extends AccumuloServerContext implements Iface {
@@ -269,6 +270,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
@Override
public Iterator<String> getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ @SuppressWarnings("resource")
IsolatedScanner scanner = new IsolatedScanner(getConnector().createScanner(tableName, Authorizations.EMPTY));
scanner.setRange(MetadataSchema.BlipSection.getRange());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
index 068aa81..5fbf3a0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
@@ -111,16 +111,17 @@ class CopyFailed extends MasterRepo {
// determine which failed files were loaded
Connector conn = master.getConnector();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
- for (Entry<Key,Value> entry : mscanner) {
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- FileRef loadedFile = new FileRef(fs, entry.getKey());
- String absPath = failures.remove(loadedFile);
- if (absPath != null) {
- loadedFailures.put(loadedFile, absPath);
+ try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY))) {
+ mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+
+ for (Entry<Key,Value> entry : mscanner) {
+ if (Long.parseLong(entry.getValue().toString()) == tid) {
+ FileRef loadedFile = new FileRef(fs, entry.getKey());
+ String absPath = failures.remove(loadedFile);
+ if (absPath != null) {
+ loadedFailures.put(loadedFile, absPath);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1523c55..6427b29 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -142,6 +142,8 @@ import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -256,8 +258,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.net.HostAndPort;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
public class TabletServer extends AccumuloServerContext implements Runnable {
@@ -2595,12 +2595,12 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN,
TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN});
- ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY);
- scanner.setRange(extent.toMetadataRange());
-
TreeMap<Key,Value> tkv = new TreeMap<Key,Value>();
- for (Entry<Key,Value> entry : scanner)
- tkv.put(entry.getKey(), entry.getValue());
+ try (ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY)) {
+ scanner.setRange(extent.toMetadataRange());
+ for (Entry<Key,Value> entry : scanner)
+ tkv.put(entry.getKey(), entry.getValue());
+ }
// only populate map after success
if (tabletsKeyValues == null) {