You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2020/06/15 19:02:02 UTC

[GitHub] [accumulo] milleruntime commented on a change in pull request #1605: Fixes #564 adds support multiple compaction executors

milleruntime commented on a change in pull request #1605:
URL: https://github.com/apache/accumulo/pull/1605#discussion_r440381804



##########
File path: core/src/main/java/org/apache/accumulo/core/client/admin/compaction/TooManyDeletesSelector.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.core.client.admin.compaction;
+
+import static org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer.DELETES_STAT;
+import static org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer.TOTAL_STAT;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.rfile.RFile.WriterOptions;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer;
+
+/**
+ * This compaction selector works in concert with the {@link DeletesSummarizer}. Using the
+ * statistics from DeleteSummarizer this strategy will compact all files in a table when the number
+ * of deletes/non-deletes exceeds a threshold.
+ *
+ * <p>
+ * This strategy has two options. First the {@value #THRESHOLD_OPT} option allows setting the point
+ * at which a compaction will be triggered. This options defaults to {@value #THRESHOLD_OPT_DEFAULT}
+ * and must be in the range (0.0, 1.0]. The second option is {@value #PROCEED_ZERO_NO_SUMMARY_OPT}
+ * which determines if the strategy should proceed when a bulk imported file has no summary
+ * information.
+ *
+ * <p>
+ * If the delete summarizer was configured on a table that already had files, then those files will
+ * have not summary information. This strategy can still proceed in this situation. It will fall
+ * back to using Accumulo's estimated entries per file in this case. For the files without summary
+ * information the estimated number of deletes will be zero. This fall back method will
+ * underestimate deletes which will not lead to false positives, except for the case of bulk
+ * imported files. Accumulo estimates that bulk imported files have zero entires. The second option
+ * {@value #PROCEED_ZERO_NO_SUMMARY_OPT} determines if this strategy should proceed when it sees
+ * bulk imported files that do not have summary data. This option defaults to
+ * {@value #PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT}.
+ *
+ * <p>
+ * Bulk files can be generated with summary information by calling
+ * {@code AccumuloFileOutputFormat#setSummarizers(JobConf, SummarizerConfiguration...)} or
+ * {@link WriterOptions#withSummarizers(SummarizerConfiguration...)}
+ *
+ * <p>
+ * When using this feature, its important to ensure summary cache is on and the summaries fit in the
+ * cache.
+ *
+ * @since 2.1.0
+ */
+public class TooManyDeletesSelector implements CompactionSelector {
+
+  private double threshold;
+
+  private boolean proceed_bns;
+
+  /**
+   * This option should be a floating point number between 1 and 0.
+   */
+  public static final String THRESHOLD_OPT = "threshold";
+
+  /**
+   * The default threshold.
+   */
+  public static final String THRESHOLD_OPT_DEFAULT = ".25";
+
+  public static final String PROCEED_ZERO_NO_SUMMARY_OPT = "proceed_zero_no_summary";
+
+  public static final String PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT = "false";
+
+  @Override
+  public void init(InitParamaters iparams) {
+    var options = iparams.getOptions();
+    this.threshold = Double.parseDouble(options.getOrDefault(THRESHOLD_OPT, THRESHOLD_OPT_DEFAULT));
+    if (threshold <= 0.0 || threshold > 1.0) {
+      throw new IllegalArgumentException(
+          "Threshold must be in range (0.0, 1.0], saw : " + threshold);
+    }
+
+    this.proceed_bns = Boolean.parseBoolean(
+        options.getOrDefault(PROCEED_ZERO_NO_SUMMARY_OPT, PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT));
+  }
+
+  @Override
+  public Selection select(SelectionParameters sparams) {
+
+    var tableConf = sparams.getEnvironment().getConfiguration(sparams.getTableId());
+
+    // TODO ISSUE could add a method to get props with prefix. That could be used to efficiently get

Review comment:
       Open issue for TODO 

##########
File path: core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client.admin.compaction;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.PluginEnvironment;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * This class select which files a user compaction will compact. It can also be configured per table

Review comment:
       Typo
   ```suggestion
    * This class selects which files a user compaction will compact. It can also be configured per table
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.ExecutorManager;
+import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionService {
+  // TODO ISSUE move rate limiters to the compaction service level.
+  private final CompactionPlanner planner;
+  private final Map<CompactionExecutorId,CompactionExecutor> executors;
+  private final CompactionServiceId myId;
+  private Map<KeyExtent,Collection<SubmittedJob>> submittedJobs = new ConcurrentHashMap<>();
+  private ServerContext serverCtx;
+
+  private static final Logger log = LoggerFactory.getLogger(CompactionService.class);
+
+  // TODO ISSUE change thread pool sizes if compaction service config changes
+  public CompactionService(String serviceName, String plannerClass,
+      Map<String,String> serviceOptions, ServerContext sctx, TabletServerResourceManager tsrm) {
+
+    this.myId = CompactionServiceId.of(serviceName);
+    this.serverCtx = sctx;
+
+    try {
+      planner =
+          ConfigurationTypeHelper.getClassInstance(null, plannerClass, CompactionPlanner.class);
+    } catch (IOException | ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+
+    Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new HashMap<>();
+
+    planner.init(new CompactionPlanner.InitParameters() {
+
+      @Override
+      public ServiceEnvironment getServiceEnvironment() {
+        return new ServiceEnvironmentImpl(sctx);
+      }
+
+      @Override
+      public Map<String,String> getOptions() {
+        return serviceOptions;
+      }
+
+      @Override
+      public ExecutorManager getExecutorManager() {
+        return new ExecutorManager() {
+          @Override
+          public CompactionExecutorId createExecutor(String executorName, int threads) {
+            var ceid = CompactionExecutorId.of(serviceName + "." + executorName);
+            Preconditions.checkState(!tmpExecutors.containsKey(ceid));
+            tmpExecutors.put(ceid, new CompactionExecutor(ceid, threads, tsrm));
+            return ceid;
+          }
+        };
+      }
+
+      @Override
+      public String getFullyQualifiedOption(String key) {
+        return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceName + ".opts." + key;
+      }
+    });
+
+    this.executors = Map.copyOf(tmpExecutors);
+
+    log.debug("Created new compaction service id:{} executors:{}", myId, executors.keySet());
+  }
+
+  private boolean reconcile(Set<CompactionJob> jobs, Collection<SubmittedJob> submitted) {
+    for (SubmittedJob submittedJob : submitted) {
+      // only read status once to avoid race conditions since multiple compares are done
+      var status = submittedJob.getStatus();
+      if (status == Status.QUEUED) {
+        if (!jobs.remove(submittedJob.getJob())) {
+          if (!submittedJob.cancel(Status.QUEUED)) {
+            return false;
+          }
+        }
+      } else if (status == Status.RUNNING) {
+        for (CompactionJob job : jobs) {
+          if (!Collections.disjoint(submittedJob.getJob().getFiles(), job.getFiles())) {
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
+  public void compact(CompactionKind kind, Compactable compactable,
+      Consumer<Compactable> completionCallback) {
+    // TODO ISSUE this could take a while... could run this in a thread pool

Review comment:
       Open issue for TODO

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.ExecutorManager;
+import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionService {
+  // TODO ISSUE move rate limiters to the compaction service level.

Review comment:
       Open issue for TODO

##########
File path: server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
##########
@@ -467,14 +465,11 @@ public String invalidMessage(String argument) {
       }
       case TABLE_COMPACT: {
         TableOperation tableOp = TableOperation.COMPACT;
-        validateArgumentCount(arguments, tableOp, 5);
+        // TODO ISSUE could have compatability mode for the old number of args

Review comment:
       Open Issue for TODO

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.ExecutorManager;
+import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionService {
+  // TODO ISSUE move rate limiters to the compaction service level.
+  private final CompactionPlanner planner;
+  private final Map<CompactionExecutorId,CompactionExecutor> executors;
+  private final CompactionServiceId myId;
+  private Map<KeyExtent,Collection<SubmittedJob>> submittedJobs = new ConcurrentHashMap<>();
+  private ServerContext serverCtx;
+
+  private static final Logger log = LoggerFactory.getLogger(CompactionService.class);
+
+  // TODO ISSUE change thread pool sizes if compaction service config changes

Review comment:
       Open issue for TODO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org