You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/02/14 14:36:34 UTC

[GitHub] [accumulo] dlmarion opened a new pull request #2490: Changes to ExternalCompaction ITs

dlmarion opened a new pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490


   Added CompactionCoordinator and Compactor to MiniAccumuloCluster. Modified ITs
   to extend SharedMiniClusterBase so that MAC was not being restarted for each
   test method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806155194



##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
##########
@@ -162,13 +169,14 @@ public void testMetrics() throws Exception {
         }
       } while (count > 0);
 
-      ExternalCompactionTestUtils.verify(client, table1, 7);
-      ExternalCompactionTestUtils.verify(client, table2, 13);
+      verify(client, table1, 7);
+      verify(client, table2, 13);
 
     } finally {
-      ExternalCompactionTestUtils.stopProcesses(c1, c2, coord);
       // We stopped the TServer and started our own, restart the original TabletServers
-      ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER);
+      // TODO: Uncomment this if other tests are added.

Review comment:
       Removed the todo in [851c2c3]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion merged pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion merged pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806155025



##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       Added startCoordinator and startCompactors to ClusterControl interface in [851c2c3] to remove casting. Throw UnsupportedOperationException for these methods in StandaloneClusterControl as they are not implemented yet.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806192371



##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       I saw the one that was in comments. Fixed in 47981eb. Some of the other casts required more work to add the missing methods to the ClusterControl interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r805952905



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;

Review comment:
       Some of this could be tightened up for readability:
   ```suggestion
       var client = ThriftUtil.getClient(new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
       try {
         return client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);

Review comment:
       Sleeping for an arbitrary amount of time tends to result in very flaky tests, especially on slower test environments. Is it possible we can try to make this wait on a resulting condition, up to a max timeout, rather than hope that it's ready after 1 second?

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {

Review comment:
       Awhile back, somebody (I think @jmark99) went through and standardized all our code on `x == null` instead of `null == x`. It'd be good to try to preserve that consistency going forward. We don't currently have an automated way to enforce this, but I think you might be one of the few who use this pattern.
   
   ```suggestion
         while (metrics == null) {
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       This isn't shorter, but I have found the IntStream semantics to be more readable in some cases. The following might be a suitable replacement, but it is a bit longer. It'd be shorter if there weren't any checked exceptions thrown.
   ```suggestion
         IntStream.range(compactorProcesses.size(), cluster.getConfig().getNumCompactors()).limit(limit).forEach(i -> {
           try {
             compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
           } catch (IOException e) {
             throw new UncheckedIOException(e);
           }
         });
   ```

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       Is the casting to `MiniAccumuloClusterImpl` still necessary here, or is it reachable from the IT superclass without that? I don't recall this being done in other tests. It might not be necessary anymore.

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
##########
@@ -162,13 +169,14 @@ public void testMetrics() throws Exception {
         }
       } while (count > 0);
 
-      ExternalCompactionTestUtils.verify(client, table1, 7);
-      ExternalCompactionTestUtils.verify(client, table2, 13);
+      verify(client, table1, 7);
+      verify(client, table2, 13);
 
     } finally {
-      ExternalCompactionTestUtils.stopProcesses(c1, c2, coord);
       // We stopped the TServer and started our own, restart the original TabletServers
-      ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER);
+      // TODO: Uncomment this if other tests are added.

Review comment:
       ```suggestion
         // Uncomment this if other tests are added.
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);

Review comment:
       Do all exceptions warrant merely sleeping and retrying? It seems to me that some exceptions might result in this never being able to succeed, and us leaving this thread waiting and retrying indefinitely.
   
   Also, it'd be good to log the exception, at least at debug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806156391



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);

Review comment:
       I modified this to only catch TException in [851c2c3]. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806166479



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       That's fine. I think the main issue is comprehensibility around the juggling of multiple loop variables. Here's another way this could be made more intuitive:
   
   ```suggestion
         int count = Math.min(limit, cluster.getConfig().getNumCompactors() - compactorProcesses.size());
         for (int i = 0; i < count; i++) {
           compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806191071



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       Applied change in 47981eb




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806155025



##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       Added startCoordinator and startCompactors to ClusterControl interface in [851c2c3] to remove casting. Throw UnsupportedOperationException for these methods in StandaloneClusterControl as they are not implemented yet.
   

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
##########
@@ -162,13 +169,14 @@ public void testMetrics() throws Exception {
         }
       } while (count > 0);
 
-      ExternalCompactionTestUtils.verify(client, table1, 7);
-      ExternalCompactionTestUtils.verify(client, table2, 13);
+      verify(client, table1, 7);
+      verify(client, table2, 13);
 
     } finally {
-      ExternalCompactionTestUtils.stopProcesses(c1, c2, coord);
       // We stopped the TServer and started our own, restart the original TabletServers
-      ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER);
+      // TODO: Uncomment this if other tests are added.

Review comment:
       Removed the todo in [851c2c3]

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       I left this as-is in [851c2c3]. The old style for loop is well-known. The new stream-style is only more readable if you are familiar with it.

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);

Review comment:
       I modified this to only catch TException in [851c2c3]. 

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);

Review comment:
       I removed the sleep in [851c2c3]

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {

Review comment:
       I updated this in [851c2c3]

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;

Review comment:
       I tightened this up a bit in [851c2c3]

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       Applied change in 47981eb

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       I saw the one that was in comments. Fixed in 47981eb. Some of the other casts required more work to add the missing methods to the ClusterControl interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806156928



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {

Review comment:
       I updated this in [851c2c3]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806156675



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);

Review comment:
       I removed the sleep in [851c2c3]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806161066



##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       Cool. I think there were a few other casts like this, including the commented out one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion merged pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion merged pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806155989



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       I left this as-is in [851c2c3]. The old style for loop is well-known. The new stream-style is only more readable if you are familiar with it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r806157281



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;

Review comment:
       I tightened this up a bit in [851c2c3]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2490: Changes to ExternalCompaction ITs

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2490:
URL: https://github.com/apache/accumulo/pull/2490#discussion_r805952905



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;

Review comment:
       Some of this could be tightened up for readability:
   ```suggestion
       var client = ThriftUtil.getClient(new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
       try {
         return client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);

Review comment:
       Sleeping for an arbitrary amount of time tends to result in very flaky tests, especially on slower test environments. Is it possible we can try to make this wait on a resulting condition, up to a max timeout, rather than hope that it's ready after 1 second?

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {

Review comment:
       Awhile back, somebody (I think @jmark99) went through and standardized all our code on `x == null` instead of `null == x`. It'd be good to try to preserve that consistency going forward. We don't currently have an automated way to enforce this, but I think you might be one of the few who use this pattern.
   
   ```suggestion
         while (metrics == null) {
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       This isn't shorter, but I have found the IntStream semantics to be more readable in some cases. The following might be a suitable replacement, but it is a bit longer. It'd be shorter if there weren't any checked exceptions thrown.
   ```suggestion
         IntStream.range(compactorProcesses.size(), cluster.getConfig().getNumCompactors()).limit(limit).forEach(i -> {
           try {
             compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
           } catch (IOException e) {
             throw new UncheckedIOException(e);
           }
         });
   ```

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       Is the casting to `MiniAccumuloClusterImpl` still necessary here, or is it reachable from the IT superclass without that? I don't recall this being done in other tests. It might not be necessary anymore.

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
##########
@@ -162,13 +169,14 @@ public void testMetrics() throws Exception {
         }
       } while (count > 0);
 
-      ExternalCompactionTestUtils.verify(client, table1, 7);
-      ExternalCompactionTestUtils.verify(client, table2, 13);
+      verify(client, table1, 7);
+      verify(client, table2, 13);
 
     } finally {
-      ExternalCompactionTestUtils.stopProcesses(c1, c2, coord);
       // We stopped the TServer and started our own, restart the original TabletServers
-      ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER);
+      // TODO: Uncomment this if other tests are added.

Review comment:
       ```suggestion
         // Uncomment this if other tests are added.
   ```

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);

Review comment:
       Do all exceptions warrant merely sleeping and retrying? It seems to me that some exceptions might result in this never being able to succeed, and us leaving this thread waiting and retrying indefinitely.
   
   Also, it'd be good to log the exception, at least at debug.

##########
File path: test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
##########
@@ -75,15 +81,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
 
   @Test
   public void testProgress() throws Exception {
-    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
-      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
-      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1");
-      coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()),
-          CompactionCoordinator.class, getCluster().getServerContext());
+      createTable(client, table1, "cs1");
+      writeData(client, table1, ROWS);
+
+      ((MiniAccumuloClusterImpl) cluster).getClusterControl().startCompactors(Compactor.class, 1,

Review comment:
       Cool. I think there were a few other casts like this, including the commented out one.

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
##########
@@ -111,6 +127,53 @@ public void adminStopAll() throws IOException {
     }
   }
 
+  private static TExternalCompactionList getRunningCompactions(ClientContext context)
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context);
+    try {
+      TExternalCompactionList running =
+          client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
+      return running;
+    } finally {
+      ThriftUtil.returnClient(client, context);
+    }
+  }
+
+  public synchronized void startCoordinator(Class<? extends CompactionCoordinator> coordinator)
+      throws IOException {
+    if (coordinatorProcess == null) {
+      coordinatorProcess = cluster
+          ._exec(coordinator, ServerType.COMPACTION_COORDINATOR, new HashMap<>()).getProcess();
+      UtilWaitThread.sleep(1000);
+      // Wait for coordinator to start
+      TExternalCompactionList metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getRunningCompactions(cluster.getServerContext());
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+  }
+
+  public synchronized void startCompactors(Class<? extends Compactor> compactor, int limit,
+      String queueName) throws IOException {
+    synchronized (compactorProcesses) {
+      int count = 0;
+      for (int i = compactorProcesses.size();
+          count < limit && i < cluster.getConfig().getNumCompactors(); i++, ++count) {
+        compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
+      }

Review comment:
       That's fine. I think the main issue is comprehensibility around the juggling of multiple loop variables. Here's another way this could be made more intuitive:
   
   ```suggestion
         int count = Math.min(limit, cluster.getConfig().getNumCompactors() - compactorProcesses.size());
         for (int i = 0; i < count; i++) {
           compactorProcesses.add(cluster.exec(compactor, "-q", queueName).getProcess());
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org