You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2016/01/09 04:38:16 UTC
[13/19] accumulo git commit: Merge branch 'javadoc-jdk8-1.6' into
javadoc-jdk8-1.7
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 01bd23a,0000000..bf582c7
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@@ -1,167 -1,0 +1,167 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor.servlets;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.monitor.Monitor;
+import org.apache.accumulo.monitor.util.Table;
+import org.apache.accumulo.monitor.util.celltypes.NumberType;
+import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ReplicationServlet extends BasicServlet {
+ private static final Logger log = LoggerFactory.getLogger(ReplicationServlet.class);
+
+ private static final long serialVersionUID = 1L;
+
+ // transient because it's not serializable and servlets are serializable
+ private transient volatile ReplicationUtil replicationUtil = null;
+
+ private synchronized ReplicationUtil getReplicationUtil() {
+ // make transient replicationUtil available as needed
+ if (replicationUtil == null) {
+ replicationUtil = new ReplicationUtil(Monitor.getContext());
+ }
+ return replicationUtil;
+ }
+
+ @Override
+ protected String getTitle(HttpServletRequest req) {
+ return "Replication Overview";
+ }
+
+ @Override
+ protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder sb) throws Exception {
+ final Connector conn = Monitor.getContext().getConnector();
+ final MasterMonitorInfo mmi = Monitor.getMmi();
+
+ // The total number of "slots" we have to replicate data
+ int totalWorkQueueSize = getReplicationUtil().getMaxReplicationThreads(mmi);
+
+ TableOperations tops = conn.tableOperations();
+ if (!ReplicationTable.isOnline(conn)) {
+ banner(sb, "", "Replication table is offline");
+ return;
+ }
+
+ Table replicationStats = new Table("replicationStats", "Replication Status");
+ replicationStats.addSortableColumn("Table");
+ replicationStats.addSortableColumn("Peer");
+ replicationStats.addSortableColumn("Remote Identifier");
+ replicationStats.addSortableColumn("ReplicaSystem Type");
+ replicationStats.addSortableColumn("Files needing replication", new NumberType<Long>(), null);
+
+ Map<String,String> peers = getReplicationUtil().getPeers();
+
+ // The total set of configured targets
+ Set<ReplicationTarget> allConfiguredTargets = getReplicationUtil().getReplicationTargets();
+
+ // Number of files per target we have to replicate
+ Map<ReplicationTarget,Long> targetCounts = getReplicationUtil().getPendingReplications();
+
+ Map<String,String> tableNameToId = tops.tableIdMap();
+ Map<String,String> tableIdToName = getReplicationUtil().invert(tableNameToId);
+
+ long filesPendingOverAllTargets = 0l;
+ for (ReplicationTarget configuredTarget : allConfiguredTargets) {
+ String tableName = tableIdToName.get(configuredTarget.getSourceTableId());
+ if (null == tableName) {
+ log.trace("Could not determine table name from id {}", configuredTarget.getSourceTableId());
+ continue;
+ }
+
+ String replicaSystemClass = peers.get(configuredTarget.getPeerName());
+ if (null == replicaSystemClass) {
+ log.trace("Could not determine configured ReplicaSystem for {}", configuredTarget.getPeerName());
+ continue;
+ }
+
+ Long numFiles = targetCounts.get(configuredTarget);
+
+ if (null == numFiles) {
+ replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(), replicaSystemClass, 0);
+ } else {
+ replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(), replicaSystemClass, numFiles);
+ filesPendingOverAllTargets += numFiles;
+ }
+ }
+
+ // Up to 2x the number of slots for replication available, WARN
+ // More than 2x the number of slots for replication available, ERROR
+ NumberType<Long> filesPendingFormat = new NumberType<Long>(Long.valueOf(0), Long.valueOf(2 * totalWorkQueueSize), Long.valueOf(0),
+ Long.valueOf(4 * totalWorkQueueSize));
+
+ String utilization = filesPendingFormat.format(filesPendingOverAllTargets);
+
- sb.append("<div><center><br/><span class=\"table-caption\">Total files pending replication: ").append(utilization).append("</span></center></div>");
++ sb.append("<div><center><br /><span class=\"table-caption\">Total files pending replication: ").append(utilization).append("</span></center></div>");
+
+ replicationStats.generate(req, sb);
+
+ // Make a table for the replication data in progress
+ Table replicationInProgress = new Table("replicationInProgress", "In-Progress Replication");
+ replicationInProgress.addSortableColumn("File");
+ replicationInProgress.addSortableColumn("Peer");
+ replicationInProgress.addSortableColumn("Source Table ID");
+ replicationInProgress.addSortableColumn("Peer Identifier");
+ replicationInProgress.addUnsortableColumn("Status");
+
+ // Read the files from the workqueue in zk
+ String zkRoot = ZooUtil.getRoot(Monitor.getContext().getInstance());
+ final String workQueuePath = zkRoot + ReplicationConstants.ZOO_WORK_QUEUE;
+
+ DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, Monitor.getContext().getConfiguration());
+
+ try {
+ for (String queueKey : workQueue.getWorkQueued()) {
+ Entry<String,ReplicationTarget> queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey);
+ String filename = queueKeyPair.getKey();
+ ReplicationTarget target = queueKeyPair.getValue();
+
+ String path = getReplicationUtil().getAbsolutePath(conn, workQueuePath, queueKey);
+ String progress = getReplicationUtil().getProgress(conn, path, target);
+
+ // Add a row in the table
+ replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(),
+ progress);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Could not calculate replication in progress", e);
+ }
+
+ replicationInProgress.generate(req, sb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 40cb604,fa0b68b..0e0089a
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@@ -43,12 -43,6 +43,12 @@@ public abstract class CompactionStrateg
* {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
* {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
*
- * <P>
++ * <p>
+ * Called while holding the tablet lock, so it should not be doing any blocking.
+ *
- * <P>
++ * <p>
+ * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
+ * easily removed.
*/
public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
@@@ -64,10 -58,6 +64,10 @@@
/**
* Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
*
- * <P>
++ * <p>
+ * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
+ * easily removed.
+ *
* @param request
* basic details about the tablet
* @return the plan for a major compaction, or null to cancel the compaction.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
index 6afcdf5,0000000..fd19658
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
@@@ -1,38 -1,0 +1,39 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * A <a href="http://en.wikipedia.org/wiki/Merkle_tree">Merkle tree</a> is a hash tree and can be used to evaluate equality over large
+ * files with the ability to ascertain what portions of the files differ. Each leaf of the Merkle tree is some hash of a
+ * portion of the file, with each leaf corresponding to some "range" within the source file. As such, if all leaves are
+ * considered as ranges of the source file, the "sum" of all leaves creates a contiguous range over the entire file.
- * <P>
++ * <p>
+ * The parent of any nodes (typically, a binary tree; however this is not required) is the concatenation of the hashes of
+ * the children. We can construct a full tree by walking up the tree, creating parents from children, until we have a root
+ * node. To check equality of two files that each have a merkle tree built, we can very easily compare the value of at the
+ * root of the Merkle tree to know whether or not the files are the same.
- * <P>
++ * <p>
+ * Additionally, in the situation where we have two files with we expect to be the same but are not, we can walk back down
+ * the tree, finding subtrees that are equal and subtrees that are not. Subtrees that are equal correspond to portions of
+ * the files which are identical, where subtrees that are not equal correspond to discrepancies between the two files.
- * <P>
++ * <p>
+ * We can apply this concept to Accumulo, treating a table as a file, and ranges within a file as an Accumulo Range. We can
+ * then compute the hashes over each of these Ranges and compute the entire Merkle tree to determine if two tables are
+ * equivalent.
+ *
+ * @since 1.7.0
+ */
- package org.apache.accumulo.test.replication.merkle;
++package org.apache.accumulo.test.replication.merkle;
++
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
index 5fa9a5f,0000000..769241e
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
@@@ -1,149 -1,0 +1,149 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle.skvi;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * {@link SortedKeyValueIterator} which attempts to compute a hash over some range of Key-Value pairs.
- * <P>
++ * <p>
+ * For the purposes of constructing a Merkle tree, this class will only generate a meaningful result if the (Batch)Scanner will compute a single digest over a
+ * Range. If the (Batch)Scanner stops and restarts in the middle of a session, incorrect values will be returned and the merkle tree will be invalid.
+ */
+public class DigestIterator implements SortedKeyValueIterator<Key,Value> {
+ public static final String HASH_NAME_KEY = "hash.name";
+
+ private MessageDigest digest;
+ private Key topKey;
+ private Value topValue;
+ private SortedKeyValueIterator<Key,Value> source;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ String hashName = options.get(HASH_NAME_KEY);
+ if (null == hashName) {
+ throw new IOException(HASH_NAME_KEY + " must be provided as option");
+ }
+
+ try {
+ this.digest = MessageDigest.getInstance(hashName);
+ } catch (NoSuchAlgorithmException e) {
+ throw new IOException(e);
+ }
+
+ this.topKey = null;
+ this.topValue = null;
+ this.source = source;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return null != topKey;
+ }
+
+ @Override
+ public void next() throws IOException {
+ // We can't call next() if we already consumed it all
+ if (!this.source.hasTop()) {
+ this.topKey = null;
+ this.topValue = null;
+ return;
+ }
+
+ this.source.next();
+
+ consume();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ this.source.seek(range, columnFamilies, inclusive);
+
+ consume();
+ }
+
+ protected void consume() throws IOException {
+ digest.reset();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+
+ if (!this.source.hasTop()) {
+ this.topKey = null;
+ this.topValue = null;
+
+ return;
+ }
+
+ Key lastKeySeen = null;
+ while (this.source.hasTop()) {
+ baos.reset();
+
+ Key currentKey = this.source.getTopKey();
+ lastKeySeen = currentKey;
+
+ currentKey.write(dos);
+ this.source.getTopValue().write(dos);
+
+ digest.update(baos.toByteArray());
+
+ this.source.next();
+ }
+
+ this.topKey = lastKeySeen;
+ this.topValue = new Value(digest.digest());
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ DigestIterator copy = new DigestIterator();
+ try {
+ copy.digest = MessageDigest.getInstance(digest.getAlgorithm());
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+
+ copy.topKey = this.topKey;
+ copy.topValue = this.topValue;
+ copy.source = this.source.deepCopy(env);
+
+ return copy;
+ }
+
+}