You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2019/09/05 20:03:26 UTC

[accumulo] 01/02: Merge branch '1.9' into 2.0

This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 81027c047e0dcede854c0cb8c99fe8af9e3772c3
Merge: d04109d bfc4dcf
Author: Ed Coleman <de...@etcoleman.com>
AuthorDate: Thu Sep 5 16:02:17 2019 -0400

    Merge branch '1.9' into 2.0

 .../java/org/apache/accumulo/master/tableOps/compact/CompactRange.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactRange.java
index 4de8a3e,0000000..f42e99e
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactRange.java
@@@ -1,185 -1,0 +1,186 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps.compact;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static java.util.Objects.requireNonNull;
 +
 +import java.util.List;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
 +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 +import org.apache.accumulo.core.clientImpl.CompactionStrategyConfigUtil;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.master.tableOps.MasterRepo;
 +import org.apache.accumulo.master.tableOps.Utils;
 +import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
 +import org.apache.commons.codec.binary.Hex;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.WritableUtils;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class CompactRange extends MasterRepo {
 +  private static final Logger log = LoggerFactory.getLogger(CompactRange.class);
 +
 +  private static final long serialVersionUID = 1L;
 +  private final TableId tableId;
 +  private final NamespaceId namespaceId;
 +  private byte[] startRow;
 +  private byte[] endRow;
 +  private byte[] config;
 +
 +  public CompactRange(NamespaceId namespaceId, TableId tableId, byte[] startRow, byte[] endRow,
 +      List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy)
 +      throws AcceptableThriftTableOperationException {
 +
 +    requireNonNull(namespaceId, "Invalid argument: null namespaceId");
 +    requireNonNull(tableId, "Invalid argument: null tableId");
 +    requireNonNull(iterators, "Invalid argument: null iterator list");
 +    requireNonNull(compactionStrategy, "Invalid argument: null compactionStrategy");
 +
 +    this.tableId = tableId;
 +    this.namespaceId = namespaceId;
 +    this.startRow = startRow.length == 0 ? null : startRow;
 +    this.endRow = endRow.length == 0 ? null : endRow;
 +
 +    if (iterators.size() > 0
 +        || !compactionStrategy.equals(CompactionStrategyConfigUtil.DEFAULT_STRATEGY)) {
 +      this.config = WritableUtils.toByteArray(
 +          new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy));
 +    } else {
-       log.info("No iterators or compaction strategy");
++      log.debug(
++          "Using default compaction strategy. No user iterators or compaction strategy provided.");
 +    }
 +
 +    if (this.startRow != null && this.endRow != null
 +        && new Text(startRow).compareTo(new Text(endRow)) >= 0)
 +      throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
 +          TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
 +          "start row must be less than end row");
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master env) throws Exception {
 +    return Utils.reserveNamespace(env, namespaceId, tid, false, true, TableOperation.COMPACT)
 +        + Utils.reserveTable(env, tableId, tid, false, true, TableOperation.COMPACT);
 +  }
 +
 +  @Override
 +  public Repo<Master> call(final long tid, Master env) throws Exception {
 +    String zTablePath = Constants.ZROOT + "/" + env.getInstanceID() + Constants.ZTABLES + "/"
 +        + tableId + Constants.ZTABLE_COMPACT_ID;
 +
 +    IZooReaderWriter zoo = env.getContext().getZooReaderWriter();
 +    byte[] cid;
 +    try {
 +      cid = zoo.mutate(zTablePath, null, null, new Mutator() {
 +        @Override
 +        public byte[] mutate(byte[] currentValue) throws Exception {
 +          String cvs = new String(currentValue, UTF_8);
 +          String[] tokens = cvs.split(",");
 +          long flushID = Long.parseLong(tokens[0]);
 +          flushID++;
 +
 +          String txidString = String.format("%016x", tid);
 +
 +          for (int i = 1; i < tokens.length; i++) {
 +            if (tokens[i].startsWith(txidString))
 +              continue; // skip self
 +
 +            log.debug("txidString : {}", txidString);
 +            log.debug("tokens[{}] : {}", i, tokens[i]);
 +
 +            throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
 +                TableOperation.COMPACT, TableOperationExceptionType.OTHER,
 +                "Another compaction with iterators and/or a compaction strategy is running");
 +          }
 +
 +          StringBuilder encodedIterators = new StringBuilder();
 +
 +          if (config != null) {
 +            Hex hex = new Hex();
 +            encodedIterators.append(",");
 +            encodedIterators.append(txidString);
 +            encodedIterators.append("=");
 +            encodedIterators.append(new String(hex.encode(config), UTF_8));
 +          }
 +
 +          return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8);
 +        }
 +      });
 +
 +      return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), namespaceId,
 +          tableId, startRow, endRow);
 +    } catch (NoNodeException nne) {
 +      throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
 +          TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
 +    }
 +
 +  }
 +
 +  static void removeIterators(Master environment, final long txid, TableId tableId)
 +      throws Exception {
 +    String zTablePath = Constants.ZROOT + "/" + environment.getInstanceID() + Constants.ZTABLES
 +        + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
 +
 +    IZooReaderWriter zoo = environment.getContext().getZooReaderWriter();
 +
 +    zoo.mutate(zTablePath, null, null, new Mutator() {
 +      @Override
 +      public byte[] mutate(byte[] currentValue) {
 +        String cvs = new String(currentValue, UTF_8);
 +        String[] tokens = cvs.split(",");
 +        long flushID = Long.parseLong(tokens[0]);
 +
 +        String txidString = String.format("%016x", txid);
 +
 +        StringBuilder encodedIterators = new StringBuilder();
 +        for (int i = 1; i < tokens.length; i++) {
 +          if (tokens[i].startsWith(txidString))
 +            continue;
 +          encodedIterators.append(",");
 +          encodedIterators.append(tokens[i]);
 +        }
 +
 +        return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8);
 +      }
 +    });
 +
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {
 +    try {
 +      removeIterators(env, tid, tableId);
 +    } finally {
 +      Utils.unreserveNamespace(env, namespaceId, tid, false);
 +      Utils.unreserveTable(env, tableId, tid, false);
 +    }
 +  }
 +
 +}