You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2019/02/21 23:00:16 UTC
[accumulo] branch master updated: Use lambdas when creating Threads
(#975)
This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 3d91fa6 Use lambdas when creating Threads (#975)
3d91fa6 is described below
commit 3d91fa6291e274120082148f890a7fcb2d1fd324
Author: Mike Walch <mw...@apache.org>
AuthorDate: Thu Feb 21 18:00:10 2019 -0500
Use lambdas when creating Threads (#975)
---
.../main/java/org/apache/accumulo/proxy/Proxy.java | 2 +-
.../org/apache/accumulo/tracer/TracerTest.java | 7 +--
.../main/java/org/apache/accumulo/shell/Shell.java | 15 +++---
.../accumulo/test/InterruptibleScannersIT.java | 53 ++++++++++------------
.../apache/accumulo/test/MultiTableRecoveryIT.java | 36 +++++++--------
.../org/apache/accumulo/test/ShellServerIT.java | 11 +----
.../apache/accumulo/test/SplitCancelsMajCIT.java | 15 +++---
.../accumulo/test/TabletServerGivesUpIT.java | 19 ++++----
.../test/functional/DeleteRowsSplitIT.java | 23 ++++------
.../accumulo/test/functional/ReadWriteIT.java | 15 +++---
.../accumulo/test/functional/ShutdownIT.java | 17 +++----
.../accumulo/test/functional/ZooCacheIT.java | 17 +++----
12 files changed, 95 insertions(+), 135 deletions(-)
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index b117fee..99ccdae 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -150,7 +150,7 @@ public class Proxy implements KeywordExecutable {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
accumulo.stop();
- } catch (InterruptedException|IOException e) {
+ } catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
} finally {
if (!folder.delete())
diff --git a/server/tracer/src/test/java/org/apache/accumulo/tracer/TracerTest.java b/server/tracer/src/test/java/org/apache/accumulo/tracer/TracerTest.java
index 4b95777..797538d 100644
--- a/server/tracer/src/test/java/org/apache/accumulo/tracer/TracerTest.java
+++ b/server/tracer/src/test/java/org/apache/accumulo/tracer/TracerTest.java
@@ -168,12 +168,7 @@ public class TracerTest {
TThreadPoolServer.Args args = new TThreadPoolServer.Args(transport);
args.processor(new Processor<Iface>(TraceWrap.service(new Service())));
final TServer tserver = new TThreadPoolServer(args);
- Thread t = new Thread() {
- @Override
- public void run() {
- tserver.serve();
- }
- };
+ Thread t = new Thread(tserver::serve);
t.start();
TTransport clientTransport = new TSocket(new Socket("localhost", socket.getLocalPort()));
TestService.Iface client = new TestService.Client(new TBinaryProtocol(clientTransport),
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index 92d84c4..b928524 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -538,16 +538,13 @@ public class Shell extends ShellOptions implements KeywordExecutable {
final FileHistory history = new FileHistory(new File(historyPath));
reader.setHistory(history);
// Add shutdown hook to flush file history, per jline javadocs
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- history.flush();
- } catch (IOException e) {
- log.warn("Could not flush history to file.");
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ history.flush();
+ } catch (IOException e) {
+ log.warn("Could not flush history to file.");
}
- });
+ }));
} catch (IOException e) {
log.warn("Unable to load history file at " + historyPath);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
index abc855f..a647cde 100644
--- a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
@@ -62,37 +62,34 @@ public class InterruptibleScannersIT extends AccumuloClusterHarness {
scanner.addScanIterator(cfg);
// create a thread to interrupt the slow scan
final Thread scanThread = Thread.currentThread();
- Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- // ensure the scan is running: not perfect, the metadata tables could be scanned, too.
- String tserver = client.instanceOperations().getTabletServers().iterator().next();
- do {
- ArrayList<ActiveScan> scans = new ArrayList<>(
- client.instanceOperations().getActiveScans(tserver));
- Iterator<ActiveScan> iter = scans.iterator();
- while (iter.hasNext()) {
- ActiveScan scan = iter.next();
- // Remove scans not against our table and not owned by us
- if (!getAdminPrincipal().equals(scan.getUser())
- || !tableName.equals(scan.getTable())) {
- iter.remove();
- }
+ Thread thread = new Thread(() -> {
+ try {
+ // ensure the scan is running: not perfect, the metadata tables could be scanned, too.
+ String tserver = client.instanceOperations().getTabletServers().iterator().next();
+ do {
+ ArrayList<ActiveScan> scans = new ArrayList<>(
+ client.instanceOperations().getActiveScans(tserver));
+ Iterator<ActiveScan> iter = scans.iterator();
+ while (iter.hasNext()) {
+ ActiveScan scan = iter.next();
+ // Remove scans not against our table and not owned by us
+ if (!getAdminPrincipal().equals(scan.getUser())
+ || !tableName.equals(scan.getTable())) {
+ iter.remove();
}
+ }
- if (!scans.isEmpty()) {
- // We found our scan
- break;
- }
- } while (true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- // BAM!
- scanThread.interrupt();
+ if (!scans.isEmpty()) {
+ // We found our scan
+ break;
+ }
+ } while (true);
+ } catch (Exception e) {
+ e.printStackTrace();
}
- };
+ // BAM!
+ scanThread.interrupt();
+ });
thread.start();
try {
// Use the scanner, expect problems
diff --git a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
index 52716d2..16d660e 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
@@ -20,6 +20,7 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.security.SecureRandom;
import java.util.Map.Entry;
import java.util.Random;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -113,26 +115,22 @@ public class MultiTableRecoveryIT extends ConfigurableMacBase {
}
private Thread agitator(final AtomicBoolean stop) {
- return new Thread() {
- @Override
- public void run() {
- try (AccumuloClient client = createClient()) {
- int i = 0;
- while (!stop.get()) {
- sleepUninterruptibly(10, TimeUnit.SECONDS);
- System.out.println("Restarting");
- getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
- getCluster().start();
- // read the metadata table to know everything is back up
- Iterators
- .size(client.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
- i++;
- }
- System.out.println("Restarted " + i + " times");
- } catch (Exception ex) {
- log.error("{}", ex.getMessage(), ex);
+ return new Thread(() -> {
+ try (AccumuloClient client = createClient()) {
+ int i = 0;
+ while (!stop.get()) {
+ sleepUninterruptibly(10, TimeUnit.SECONDS);
+ System.out.println("Restarting");
+ getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
+ getCluster().start();
+ // read the metadata table to know everything is back up
+ Iterators.size(client.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ i++;
}
+ System.out.println("Restarted " + i + " times");
+ } catch (IOException | InterruptedException | TableNotFoundException ex) {
+ log.error("{}", ex.getMessage(), ex);
}
- };
+ });
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index 508d6fb..6f59e15 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -1570,16 +1570,7 @@ public class ShellServerIT extends SharedMiniClusterBase {
SlowIterator.setSleepTime(cfg, 500);
s.addScanIterator(cfg);
- Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- Iterators.size(s.iterator());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
+ Thread thread = new Thread(() -> Iterators.size(s.iterator()));
thread.start();
List<String> scans = new ArrayList<>();
diff --git a/test/src/main/java/org/apache/accumulo/test/SplitCancelsMajCIT.java b/test/src/main/java/org/apache/accumulo/test/SplitCancelsMajCIT.java
index c4ef963..d9bac52 100644
--- a/test/src/main/java/org/apache/accumulo/test/SplitCancelsMajCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/SplitCancelsMajCIT.java
@@ -76,16 +76,13 @@ public class SplitCancelsMajCIT extends SharedMiniClusterBase {
}
// start majc
final AtomicReference<Exception> ex = new AtomicReference<>();
- Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- c.tableOperations().compact(tableName, null, null, true, true);
- } catch (Exception e) {
- ex.set(e);
- }
+ Thread thread = new Thread(() -> {
+ try {
+ c.tableOperations().compact(tableName, null, null, true, true);
+ } catch (Exception e) {
+ ex.set(e);
}
- };
+ });
thread.start();
long now = System.currentTimeMillis();
diff --git a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
index ac4172d..f06f74e 100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@ -56,18 +56,15 @@ public class TabletServerGivesUpIT extends ConfigurableMacBase {
cluster.getMiniDfs().shutdown();
// ask the tserver to do something
final AtomicReference<Exception> ex = new AtomicReference<>();
- Thread splitter = new Thread() {
- @Override
- public void run() {
- try {
- TreeSet<Text> splits = new TreeSet<>();
- splits.add(new Text("X"));
- client.tableOperations().addSplits(tableName, splits);
- } catch (Exception e) {
- ex.set(e);
- }
+ Thread splitter = new Thread(() -> {
+ try {
+ TreeSet<Text> splits = new TreeSet<>();
+ splits.add(new Text("X"));
+ client.tableOperations().addSplits(tableName, splits);
+ } catch (Exception e) {
+ ex.set(e);
}
- };
+ });
splitter.start();
// wait for the tserver to give up on writing to the WAL
while (client.instanceOperations().getTabletServers().size() == 1) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java
index 931bf5b..5583793 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java
@@ -85,21 +85,18 @@ public class DeleteRowsSplitIT extends AccumuloClusterHarness {
// initiate the delete range
final boolean fail[] = {false};
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- // split the table
- final SortedSet<Text> afterEnd = SPLITS.tailSet(new Text(end + "\0"));
- client.tableOperations().addSplits(tableName, afterEnd);
- } catch (Exception ex) {
- log.error("Exception", ex);
- synchronized (fail) {
- fail[0] = true;
- }
+ Thread t = new Thread(() -> {
+ try {
+ // split the table
+ final SortedSet<Text> afterEnd = SPLITS.tailSet(new Text(end + "\0"));
+ client.tableOperations().addSplits(tableName, afterEnd);
+ } catch (Exception ex) {
+ log.error("Exception", ex);
+ synchronized (fail) {
+ fail[0] = true;
}
}
- };
+ });
t.start();
sleepUninterruptibly(test * 2, TimeUnit.MILLISECONDS);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 9171016..5a9ceda 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -352,16 +352,13 @@ public class ReadWriteIT extends AccumuloClusterHarness {
int i;
for (i = 0; i < ROWS; i += CHUNKSIZE) {
final int start = i;
- Thread verify = new Thread() {
- @Override
- public void run() {
- try {
- verify(accumuloClient, getClientInfo(), CHUNKSIZE, 1, 50, start, tableName);
- } catch (Exception ex) {
- fail.set(true);
- }
+ Thread verify = new Thread(() -> {
+ try {
+ verify(accumuloClient, getClientInfo(), CHUNKSIZE, 1, 50, start, tableName);
+ } catch (Exception ex) {
+ fail.set(true);
}
- };
+ });
verify.start();
ingest(accumuloClient, getClientInfo(), CHUNKSIZE, 1, 50, i + CHUNKSIZE, tableName);
verify.join();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
index 9dab761..ccf9597 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
@@ -82,17 +82,14 @@ public class ShutdownIT extends ConfigurableMacBase {
c.tableOperations().create("table" + i);
}
final AtomicReference<Exception> ref = new AtomicReference<>();
- Thread async = new Thread() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < 10; i++)
- c.tableOperations().delete("table" + i);
- } catch (Exception ex) {
- ref.set(ex);
- }
+ Thread async = new Thread(() -> {
+ try {
+ for (int i = 0; i < 10; i++)
+ c.tableOperations().delete("table" + i);
+ } catch (Exception ex) {
+ ref.set(ex);
}
- };
+ });
async.start();
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
assertEquals(0, cluster.exec(Admin.class, "stopAll").getProcess().waitFor());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java
index 05b6db3..ee89fc2 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java
@@ -56,17 +56,14 @@ public class ZooCacheIT extends ConfigurableMacBase {
final AtomicReference<Exception> ref = new AtomicReference<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; i++) {
- Thread reader = new Thread() {
- @Override
- public void run() {
- try (AccumuloClient client = createClient()) {
- CacheTestReader.main(new String[] {pathName, testDir.getAbsolutePath(),
- ClientInfo.from(client.properties()).getZooKeepers()});
- } catch (Exception ex) {
- ref.set(ex);
- }
+ Thread reader = new Thread(() -> {
+ try (AccumuloClient client = createClient()) {
+ CacheTestReader.main(new String[] {pathName, testDir.getAbsolutePath(),
+ ClientInfo.from(client.properties()).getZooKeepers()});
+ } catch (Exception ex) {
+ ref.set(ex);
}
- };
+ });
reader.start();
threads.add(reader);
}