You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/02/14 15:43:36 UTC

[GitHub] [accumulo] ctubbsii commented on a change in pull request #2490: Changes to ExternalCompaction ITs

ctubbsii commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r805952905



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;

Review comment:
       Some of this could be tightened up for readability:
   ```suggestion
       var client = ThriftUtil.getClient(new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
       try {
         return client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);

Review comment:
       Sleeping for an arbitrary amount of time tends to result in very flaky tests, especially on slower test environments. Is it possible we can try to make this wait on a resulting condition, up to a max timeout, rather than hope that it's ready after 1 second?

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {

Review comment:
       Awhile back, somebody (I think @jmark99) went through and standardized all our code on `x == null` instead of `null == x`. It'd be good to try to preserve that consistency going forward. We don't currently have an automated way to enforce this, but I think you might be one of the few who use this pattern.
   
   ```suggestion
         while (metrics == null) {
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       This isn't shorter, but I have found the IntStream semantics to be more readable in some cases. The following might be a suitable replacement, but it is a bit longer. It'd be shorter if there weren't any checked exceptions thrown.
   ```suggestion
         IntStream.range(compactorProcesses.size(), cluster.getConfig().getNumCompactors()).limit(limit).forEach(i -> {
           try {
             compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
           } catch (IOException e) {
             throw new UncheckedIOException(e);
           }
         });
   ```

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       Is the casting to `MiniAccumuloClusterImpl` still necessary here, or is it reachable from the IT superclass without that? I don't recall this being done in other tests. It might not be necessary anymore.

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
##########
@@ -162,13 +169,14 @@ public void testMetrics() throws Exception {
         }
       } while (count > 0);
 
-      ExternalCompactionTestUtils.verify(client, table1, 7);
-      ExternalCompactionTestUtils.verify(client, table2, 13);
+      verify(client, table1, 7);
+      verify(client, table2, 13);
 
     } finally {
-      ExternalCompactionTestUtils.stopProcesses(c1, c2, coord);
       // We stopped the TServer and started our own, restart the original TabletServers
-      ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER);
+      // TODO: Uncomment this if other tests are added.

Review comment:
       ```suggestion
         // Uncomment this if other tests are added.
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);

Review comment:
       Do all exceptions warrant merely sleeping and retrying? It seems to me that some exceptions might result in this never being able to succeed, and us leaving this thread waiting and retrying indefinitely.
   
   Also, it'd be good to log the exception, at least at debug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org