You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2016/01/09 04:38:16 UTC

[13/19] accumulo git commit: Merge branch 'javadoc-jdk8-1.6' into javadoc-jdk8-1.7

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 01bd23a,0000000..bf582c7
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@@ -1,167 -1,0 +1,167 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor.servlets;
 +
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import javax.servlet.http.HttpServletRequest;
 +import javax.servlet.http.HttpServletResponse;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 +import org.apache.accumulo.core.replication.ReplicationConstants;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.monitor.Monitor;
 +import org.apache.accumulo.monitor.util.Table;
 +import org.apache.accumulo.monitor.util.celltypes.NumberType;
 +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
 +import org.apache.accumulo.server.replication.ReplicationUtil;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + *
 + */
 +public class ReplicationServlet extends BasicServlet {
 +  private static final Logger log = LoggerFactory.getLogger(ReplicationServlet.class);
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  // transient because it's not serializable and servlets are serializable
 +  private transient volatile ReplicationUtil replicationUtil = null;
 +
 +  private synchronized ReplicationUtil getReplicationUtil() {
 +    // make transient replicationUtil available as needed
 +    if (replicationUtil == null) {
 +      replicationUtil = new ReplicationUtil(Monitor.getContext());
 +    }
 +    return replicationUtil;
 +  }
 +
 +  @Override
 +  protected String getTitle(HttpServletRequest req) {
 +    return "Replication Overview";
 +  }
 +
 +  @Override
 +  protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder sb) throws Exception {
 +    final Connector conn = Monitor.getContext().getConnector();
 +    final MasterMonitorInfo mmi = Monitor.getMmi();
 +
 +    // The total number of "slots" we have to replicate data
 +    int totalWorkQueueSize = getReplicationUtil().getMaxReplicationThreads(mmi);
 +
 +    TableOperations tops = conn.tableOperations();
 +    if (!ReplicationTable.isOnline(conn)) {
 +      banner(sb, "", "Replication table is offline");
 +      return;
 +    }
 +
 +    Table replicationStats = new Table("replicationStats", "Replication Status");
 +    replicationStats.addSortableColumn("Table");
 +    replicationStats.addSortableColumn("Peer");
 +    replicationStats.addSortableColumn("Remote Identifier");
 +    replicationStats.addSortableColumn("ReplicaSystem Type");
 +    replicationStats.addSortableColumn("Files needing replication", new NumberType<Long>(), null);
 +
 +    Map<String,String> peers = getReplicationUtil().getPeers();
 +
 +    // The total set of configured targets
 +    Set<ReplicationTarget> allConfiguredTargets = getReplicationUtil().getReplicationTargets();
 +
 +    // Number of files per target we have to replicate
 +    Map<ReplicationTarget,Long> targetCounts = getReplicationUtil().getPendingReplications();
 +
 +    Map<String,String> tableNameToId = tops.tableIdMap();
 +    Map<String,String> tableIdToName = getReplicationUtil().invert(tableNameToId);
 +
 +    long filesPendingOverAllTargets = 0l;
 +    for (ReplicationTarget configuredTarget : allConfiguredTargets) {
 +      String tableName = tableIdToName.get(configuredTarget.getSourceTableId());
 +      if (null == tableName) {
 +        log.trace("Could not determine table name from id {}", configuredTarget.getSourceTableId());
 +        continue;
 +      }
 +
 +      String replicaSystemClass = peers.get(configuredTarget.getPeerName());
 +      if (null == replicaSystemClass) {
 +        log.trace("Could not determine configured ReplicaSystem for {}", configuredTarget.getPeerName());
 +        continue;
 +      }
 +
 +      Long numFiles = targetCounts.get(configuredTarget);
 +
 +      if (null == numFiles) {
 +        replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(), replicaSystemClass, 0);
 +      } else {
 +        replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(), replicaSystemClass, numFiles);
 +        filesPendingOverAllTargets += numFiles;
 +      }
 +    }
 +
 +    // Up to 2x the number of slots for replication available, WARN
 +    // More than 2x the number of slots for replication available, ERROR
 +    NumberType<Long> filesPendingFormat = new NumberType<Long>(Long.valueOf(0), Long.valueOf(2 * totalWorkQueueSize), Long.valueOf(0),
 +        Long.valueOf(4 * totalWorkQueueSize));
 +
 +    String utilization = filesPendingFormat.format(filesPendingOverAllTargets);
 +
-     sb.append("<div><center><br/><span class=\"table-caption\">Total files pending replication: ").append(utilization).append("</span></center></div>");
++    sb.append("<div><center><br /><span class=\"table-caption\">Total files pending replication: ").append(utilization).append("</span></center></div>");
 +
 +    replicationStats.generate(req, sb);
 +
 +    // Make a table for the replication data in progress
 +    Table replicationInProgress = new Table("replicationInProgress", "In-Progress Replication");
 +    replicationInProgress.addSortableColumn("File");
 +    replicationInProgress.addSortableColumn("Peer");
 +    replicationInProgress.addSortableColumn("Source Table ID");
 +    replicationInProgress.addSortableColumn("Peer Identifier");
 +    replicationInProgress.addUnsortableColumn("Status");
 +
 +    // Read the files from the workqueue in zk
 +    String zkRoot = ZooUtil.getRoot(Monitor.getContext().getInstance());
 +    final String workQueuePath = zkRoot + ReplicationConstants.ZOO_WORK_QUEUE;
 +
 +    DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, Monitor.getContext().getConfiguration());
 +
 +    try {
 +      for (String queueKey : workQueue.getWorkQueued()) {
 +        Entry<String,ReplicationTarget> queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey);
 +        String filename = queueKeyPair.getKey();
 +        ReplicationTarget target = queueKeyPair.getValue();
 +
 +        String path = getReplicationUtil().getAbsolutePath(conn, workQueuePath, queueKey);
 +        String progress = getReplicationUtil().getProgress(conn, path, target);
 +
 +        // Add a row in the table
 +        replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(),
 +            progress);
 +      }
 +    } catch (KeeperException | InterruptedException e) {
 +      log.warn("Could not calculate replication in progress", e);
 +    }
 +
 +    replicationInProgress.generate(req, sb);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 40cb604,fa0b68b..0e0089a
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@@ -43,12 -43,6 +43,12 @@@ public abstract class CompactionStrateg
     * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
     * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
     *
-    * <P>
++   * <p>
 +   * Called while holding the tablet lock, so it should not be doing any blocking.
 +   *
-    * <P>
++   * <p>
 +   * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
 +   * easily removed.
     */
    public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
  
@@@ -64,10 -58,6 +64,10 @@@
    /**
     * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
     *
-    * <P>
++   * <p>
 +   * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
 +   * easily removed.
 +   *
     * @param request
     *          basic details about the tablet
     * @return the plan for a major compaction, or null to cancel the compaction.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
index 6afcdf5,0000000..fd19658
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
@@@ -1,38 -1,0 +1,39 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +/**
 + * A <a href="http://en.wikipedia.org/wiki/Merkle_tree">Merkle tree</a> is a hash tree and can be used to evaluate equality over large
 + * files with the ability to ascertain what portions of the files differ. Each leaf of the Merkle tree is some hash of a
 + * portion of the file, with each leaf corresponding to some "range" within the source file. As such, if all leaves are
 + * considered as ranges of the source file, the "sum" of all leaves creates a contiguous range over the entire file.
-  * <P>
++ * <p>
 + * The parent of any nodes (typically, a binary tree; however this is not required) is the concatenation of the hashes of
 + * the children. We can construct a full tree by walking up the tree, creating parents from children, until we have a root
 + * node. To check equality of two files that each have a merkle tree built, we can very easily compare the value of at the
 + * root of the Merkle tree to know whether or not the files are the same.
-  * <P>
++ * <p>
 + * Additionally, in the situation where we have two files with we expect to be the same but are not, we can walk back down
 + * the tree, finding subtrees that are equal and subtrees that are not. Subtrees that are equal correspond to portions of
 + * the files which are identical, where subtrees that are not equal correspond to discrepancies between the two files.
-  * <P>
++ * <p>
 + * We can apply this concept to Accumulo, treating a table as a file, and ranges within a file as an Accumulo Range. We can
 + * then compute the hashes over each of these Ranges and compute the entire Merkle tree to determine if two tables are
 + * equivalent.
 + *
 + * @since 1.7.0
 + */
- package org.apache.accumulo.test.replication.merkle;
++package org.apache.accumulo.test.replication.merkle;
++

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
index 5fa9a5f,0000000..769241e
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
@@@ -1,149 -1,0 +1,149 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.replication.merkle.skvi;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.security.MessageDigest;
 +import java.security.NoSuchAlgorithmException;
 +import java.util.Collection;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +
 +/**
 + * {@link SortedKeyValueIterator} which attempts to compute a hash over some range of Key-Value pairs.
-  * <P>
++ * <p>
 + * For the purposes of constructing a Merkle tree, this class will only generate a meaningful result if the (Batch)Scanner will compute a single digest over a
 + * Range. If the (Batch)Scanner stops and restarts in the middle of a session, incorrect values will be returned and the merkle tree will be invalid.
 + */
 +public class DigestIterator implements SortedKeyValueIterator<Key,Value> {
 +  public static final String HASH_NAME_KEY = "hash.name";
 +
 +  private MessageDigest digest;
 +  private Key topKey;
 +  private Value topValue;
 +  private SortedKeyValueIterator<Key,Value> source;
 +
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    String hashName = options.get(HASH_NAME_KEY);
 +    if (null == hashName) {
 +      throw new IOException(HASH_NAME_KEY + " must be provided as option");
 +    }
 +
 +    try {
 +      this.digest = MessageDigest.getInstance(hashName);
 +    } catch (NoSuchAlgorithmException e) {
 +      throw new IOException(e);
 +    }
 +
 +    this.topKey = null;
 +    this.topValue = null;
 +    this.source = source;
 +  }
 +
 +  @Override
 +  public boolean hasTop() {
 +    return null != topKey;
 +  }
 +
 +  @Override
 +  public void next() throws IOException {
 +    // We can't call next() if we already consumed it all
 +    if (!this.source.hasTop()) {
 +      this.topKey = null;
 +      this.topValue = null;
 +      return;
 +    }
 +
 +    this.source.next();
 +
 +    consume();
 +  }
 +
 +  @Override
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
 +    this.source.seek(range, columnFamilies, inclusive);
 +
 +    consume();
 +  }
 +
 +  protected void consume() throws IOException {
 +    digest.reset();
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +
 +    if (!this.source.hasTop()) {
 +      this.topKey = null;
 +      this.topValue = null;
 +
 +      return;
 +    }
 +
 +    Key lastKeySeen = null;
 +    while (this.source.hasTop()) {
 +      baos.reset();
 +
 +      Key currentKey = this.source.getTopKey();
 +      lastKeySeen = currentKey;
 +
 +      currentKey.write(dos);
 +      this.source.getTopValue().write(dos);
 +
 +      digest.update(baos.toByteArray());
 +
 +      this.source.next();
 +    }
 +
 +    this.topKey = lastKeySeen;
 +    this.topValue = new Value(digest.digest());
 +  }
 +
 +  @Override
 +  public Key getTopKey() {
 +    return topKey;
 +  }
 +
 +  @Override
 +  public Value getTopValue() {
 +    return topValue;
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    DigestIterator copy = new DigestIterator();
 +    try {
 +      copy.digest = MessageDigest.getInstance(digest.getAlgorithm());
 +    } catch (NoSuchAlgorithmException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    copy.topKey = this.topKey;
 +    copy.topValue = this.topValue;
 +    copy.source = this.source.deepCopy(env);
 +
 +    return copy;
 +  }
 +
 +}