You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:13 UTC
[34/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
new file mode 100644
index 0000000..0ad2196
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MergeInfo.Operation;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Merge makes things hard.
+ *
+ * Typically, a client will read the list of tablets, and begin an operation on that tablet at the location listed in the metadata table. When a tablet splits,
+ * the information read from the metadata table doesn't match reality, so the operation fails, and must be retried. But the operation will take place either on
+ * the parent, or at a later time on the children. It won't take place on just half of the tablet.
+ *
+ * However, when a merge occurs, the operation may have succeeded on one section of the merged area, and not on the others, when the merge occurs. There is no
+ * way to retry the request at a later time on an unmodified tablet.
+ *
+ * The code below uses read-write lock to prevent some operations while a merge is taking place. Normal operations, like bulk imports, will grab the read lock
+ * and prevent merges (writes) while they run. Merge operations will lock out some operations while they run.
+ */
+class TableRangeOpWait extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private String tableId;
+
+ public TableRangeOpWait(String tableId) {
+ this.tableId = tableId;
+ }
+
+ @Override
+ public long isReady(long tid, Master env) throws Exception {
+ Text tableIdText = new Text(tableId);
+ if (!env.getMergeInfo(tableIdText).getState().equals(MergeState.NONE)) {
+ return 50;
+ }
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ Text tableIdText = new Text(tableId);
+ MergeInfo mergeInfo = master.getMergeInfo(tableIdText);
+ log.info("removing merge information " + mergeInfo);
+ master.clearMergeState(tableIdText);
+ Utils.unreserveTable(tableId, tid, true);
+ return null;
+ }
+
+}
+
+public class TableRangeOp extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId;
+ private byte[] startRow;
+ private byte[] endRow;
+ private Operation op;
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE);
+ }
+
+ public TableRangeOp(MergeInfo.Operation op, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException {
+
+ this.tableId = tableId;
+ this.startRow = TextUtil.getBytes(startRow);
+ this.endRow = TextUtil.getBytes(endRow);
+ this.op = op;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+
+ if (RootTable.ID.equals(tableId) && TableOperation.MERGE.equals(op)) {
+ log.warn("Attempt to merge tablets for " + RootTable.NAME + " does nothing. It is not splittable.");
+ }
+
+ Text start = startRow.length == 0 ? null : new Text(startRow);
+ Text end = endRow.length == 0 ? null : new Text(endRow);
+ Text tableIdText = new Text(tableId);
+
+ if (start != null && end != null)
+ if (start.compareTo(end) >= 0)
+ throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.BAD_RANGE,
+ "start row must be less than end row");
+
+ env.mustBeOnline(tableId);
+
+ MergeInfo info = env.getMergeInfo(tableIdText);
+
+ if (info.getState() == MergeState.NONE) {
+ KeyExtent range = new KeyExtent(tableIdText, end, start);
+ env.setMergeState(new MergeInfo(range, op), MergeState.STARTED);
+ }
+
+ return new TableRangeOpWait(tableId);
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ // Not sure this is a good thing to do. The Master state engine should be the one to remove it.
+ Text tableIdText = new Text(tableId);
+ MergeInfo mergeInfo = env.getMergeInfo(tableIdText);
+ if (mergeInfo.getState() != MergeState.NONE)
+ log.info("removing merge information " + mergeInfo);
+ env.clearMergeState(tableIdText);
+ Utils.unreserveTable(tableId, tid, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java
new file mode 100644
index 0000000..dd691b4
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.trace.thrift.TInfo;
+
+/**
+ *
+ */
+public class TraceRepo<T> implements Repo<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ TInfo tinfo;
+ Repo<T> repo;
+
+ public TraceRepo(Repo<T> repo) {
+ this.repo = repo;
+ tinfo = Tracer.traceInfo();
+ }
+
+ @Override
+ public long isReady(long tid, T environment) throws Exception {
+ Span span = Trace.trace(tinfo, repo.getDescription());
+ try {
+ return repo.isReady(tid, environment);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public Repo<T> call(long tid, T environment) throws Exception {
+ Span span = Trace.trace(tinfo, repo.getDescription());
+ try {
+ Repo<T> result = repo.call(tid, environment);
+ if (result == null)
+ return result;
+ return new TraceRepo<T>(result);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void undo(long tid, T environment) throws Exception {
+ Span span = Trace.trace(tinfo, repo.getDescription());
+ try {
+ repo.undo(tid, environment);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return repo.getDescription();
+ }
+
+ @Override
+ public String getReturn() {
+ return repo.getReturn();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
new file mode 100644
index 0000000..fa14f43
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.math.BigInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooReservation;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooQueueLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+public class Utils {
+
+ static void checkTableDoesNotExist(Instance instance, String tableName, String tableId, TableOperation operation) throws ThriftTableOperationException {
+
+ String id = Tables.getNameToIdMap(instance).get(tableName);
+
+ if (id != null && !id.equals(tableId))
+ throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.EXISTS, null);
+ }
+
+ static String getNextTableId(String tableName, Instance instance) throws ThriftTableOperationException {
+
+ String tableId = null;
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ final String ntp = ZooUtil.getRoot(instance) + Constants.ZTABLES;
+ byte[] nid = zoo.mutate(ntp, "0".getBytes(), ZooUtil.PUBLIC, new Mutator() {
+ @Override
+ public byte[] mutate(byte[] currentValue) throws Exception {
+ BigInteger nextId = new BigInteger(new String(currentValue), Character.MAX_RADIX);
+ nextId = nextId.add(BigInteger.ONE);
+ return nextId.toString(Character.MAX_RADIX).getBytes();
+ }
+ });
+ return new String(nid);
+ } catch (Exception e1) {
+ Logger.getLogger(CreateTable.class).error("Failed to assign tableId to " + tableName, e1);
+ throw new ThriftTableOperationException(tableId, tableName, TableOperation.CREATE, TableOperationExceptionType.OTHER, e1.getMessage());
+ }
+ }
+
+ static final Lock tableNameLock = new ReentrantLock();
+ static final Lock idLock = new ReentrantLock();
+ private static final Logger log = Logger.getLogger(Utils.class);
+
+ public static long reserveTable(String tableId, long tid, boolean writeLock, boolean tableMustExist, TableOperation op) throws Exception {
+ if (getLock(tableId, tid, writeLock).tryLock()) {
+ if (tableMustExist) {
+ Instance instance = HdfsZooInstance.getInstance();
+ IZooReaderWriter zk = ZooReaderWriter.getRetryingInstance();
+ if (!zk.exists(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId))
+ throw new ThriftTableOperationException(tableId, "", op, TableOperationExceptionType.NOTFOUND, "Table does not exists");
+ }
+ log.info("table " + tableId + " (" + Long.toHexString(tid) + ") locked for " + (writeLock ? "write" : "read") + " operation: " + op);
+ return 0;
+ } else
+ return 100;
+ }
+
+ public static void unreserveTable(String tableId, long tid, boolean writeLock) throws Exception {
+ getLock(tableId, tid, writeLock).unlock();
+ log.info("table " + tableId + " (" + Long.toHexString(tid) + ") unlocked for " + (writeLock ? "write" : "read"));
+ }
+
+ public static long reserveHdfsDirectory(String directory, long tid) throws KeeperException, InterruptedException {
+ Instance instance = HdfsZooInstance.getInstance();
+
+ String resvPath = ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS + "/" + new String(Base64.encodeBase64(directory.getBytes()));
+
+ IZooReaderWriter zk = ZooReaderWriter.getRetryingInstance();
+
+ if (ZooReservation.attempt(zk, resvPath, String.format("%016x", tid), "")) {
+ return 0;
+ } else
+ return 50;
+ }
+
+ public static void unreserveHdfsDirectory(String directory, long tid) throws KeeperException, InterruptedException {
+ Instance instance = HdfsZooInstance.getInstance();
+ String resvPath = ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS + "/" + new String(Base64.encodeBase64(directory.getBytes()));
+ ZooReservation.release(ZooReaderWriter.getRetryingInstance(), resvPath, String.format("%016x", tid));
+ }
+
+ private static Lock getLock(String tableId, long tid, boolean writeLock) throws Exception {
+ byte[] lockData = String.format("%016x", tid).getBytes();
+ ZooQueueLock qlock = new ZooQueueLock(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLE_LOCKS + "/" + tableId, false);
+ Lock lock = DistributedReadWriteLock.recoverLock(qlock, lockData);
+ if (lock == null) {
+ DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, lockData);
+ if (writeLock)
+ lock = locker.writeLock();
+ else
+ lock = locker.readLock();
+ }
+ return lock;
+ }
+
+ public static Lock getReadLock(String tableId, long tid) throws Exception {
+ return Utils.getLock(tableId, tid, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
new file mode 100644
index 0000000..7189637
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tserverOps;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.master.EventCoordinator.Listener;
+import org.apache.accumulo.master.tableOps.MasterRepo;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TTransportException;
+
+public class ShutdownTServer extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger log = Logger.getLogger(ShutdownTServer.class);
+ private TServerInstance server;
+ private boolean force;
+
+ public ShutdownTServer(TServerInstance server, boolean force) {
+ this.server = server;
+ this.force = force;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ // suppress assignment of tablets to the server
+ if (force) {
+ String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZTSERVERS + "/" + server.getLocation();
+ ZooLock.deleteLock(path);
+ path = ZooUtil.getRoot(master.getInstance()) + Constants.ZDEADTSERVERS + "/" + server.getLocation();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ zoo.putPersistentData(path, "forced down".getBytes(), NodeExistsPolicy.OVERWRITE);
+ return null;
+ }
+
+ // TODO move this to isReady() and drop while loop? - ACCUMULO-1259
+ Listener listener = master.getEventCoordinator().getListener();
+ master.shutdownTServer(server);
+ while (master.onlineTabletServers().contains(server)) {
+ TServerConnection connection = master.getConnection(server);
+ if (connection != null) {
+ try {
+ TabletServerStatus status = connection.getTableMap(false);
+ if (status.tableMap != null && status.tableMap.isEmpty()) {
+ log.info("tablet server hosts no tablets " + server);
+ connection.halt(master.getMasterLock());
+ log.info("tablet server asked to halt " + server);
+ break;
+ }
+ } catch (TTransportException ex) {
+ // expected
+ } catch (Exception ex) {
+ log.error("Error talking to tablet server " + server + ": " + ex);
+ }
+ }
+ listener.waitForEvents(1000);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master m) throws Exception {}
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java b/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java
new file mode 100644
index 0000000..f794112
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+/**
+ * A utility to administer FATE operations
+ */
+public class FateAdmin {
+
+ static class TxOpts {
+ @Parameter(description = "<txid>", required = true)
+ List<String> args = new ArrayList<String>();
+ }
+
+ @Parameters(commandDescription = "Stop an existing FATE by transaction id")
+ static class FailOpts extends TxOpts {}
+
+ @Parameters(commandDescription = "Delete an existing FATE by transaction id")
+ static class DeleteOpts extends TxOpts {}
+
+ @Parameters(commandDescription = "List the existing FATE transactions")
+ static class PrintOpts {}
+
+ public static void main(String[] args) throws Exception {
+ Help opts = new Help();
+ JCommander jc = new JCommander(opts);
+ jc.setProgramName(FateAdmin.class.getName());
+ jc.addCommand("fail", new FailOpts());
+ jc.addCommand("delete", new DeleteOpts());
+ jc.addCommand("print", new PrintOpts());
+ jc.parse(args);
+ if (opts.help || jc.getParsedCommand() == null) {
+ jc.usage();
+ System.exit(1);
+ }
+
+ System.err.printf("This tool has been deprecated%nFATE administration now available within 'accumulo shell'%n$ fate fail <txid>... | delete <txid>... | print [<txid>...]%n%n");
+
+ AdminUtil<Master> admin = new AdminUtil<Master>();
+
+ Instance instance = HdfsZooInstance.getInstance();
+ String path = ZooUtil.getRoot(instance) + Constants.ZFATE;
+ String masterPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK;
+ IZooReaderWriter zk = ZooReaderWriter.getRetryingInstance();
+ ZooStore<Master> zs = new ZooStore<Master>(path, zk);
+
+ if (jc.getParsedCommand().equals("fail")) {
+ admin.prepFail(zs, zk, masterPath, args[1]);
+ } else if (jc.getParsedCommand().equals("delete")) {
+ admin.prepDelete(zs, zk, masterPath, args[1]);
+ admin.deleteLocks(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, args[1]);
+ } else if (jc.getParsedCommand().equals("print")) {
+ admin.print(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java b/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java
new file mode 100644
index 0000000..3389aa3
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import org.apache.accumulo.server.util.DefaultMap;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class DefaultMapTest {
+
+ @Test
+ public void testDefaultMap() {
+ DefaultMap<String,String> map = new DefaultMap<String,String>("");
+ map.put("key", "value");
+ String empty = map.get("otherKey");
+ assertEquals(map.get("key"), "value");
+ assertEquals(empty, "");
+ assertTrue(empty == map.get("otherKey"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
new file mode 100644
index 0000000..f435062
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.master.state.MergeStats;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ *
+ */
+public class TestMergeState {
+
+ class MockCurrentState implements CurrentState {
+
+ TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456);
+ MergeInfo mergeInfo;
+
+ MockCurrentState(MergeInfo info) {
+ this.mergeInfo = info;
+ }
+
+ @Override
+ public Set<String> onlineTables() {
+ return Collections.singleton("t");
+ }
+
+ @Override
+ public Set<TServerInstance> onlineTabletServers() {
+ return Collections.singleton(someTServer);
+ }
+
+ @Override
+ public Collection<MergeInfo> merges() {
+ return Collections.singleton(mergeInfo);
+ }
+ }
+
+ private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ bw.addMutation(m);
+ bw.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ Instance instance = new MockInstance();
+ Connector connector = instance.getConnector("root", new PasswordToken(""));
+ BatchWriter bw = connector.createBatchWriter("!METADATA", new BatchWriterConfig());
+
+ // Create a fake METADATA table with these splits
+ String splits[] = {"a", "e", "j", "o", "t", "z"};
+ // create metadata for a table "t" with the splits above
+ Text tableId = new Text("t");
+ Text pr = null;
+ for (String s : splits) {
+ Text split = new Text(s);
+ Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr));
+ prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes()));
+ bw.addMutation(prevRow);
+ pr = split;
+ }
+ // Add the default tablet
+ Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr));
+ defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ bw.addMutation(defaultTablet);
+ bw.close();
+
+ // Read out the TabletLocationStates
+ MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE));
+ Credentials credentials = new Credentials("root", new PasswordToken(new byte[0]));
+
+ // Verify the tablet state: hosted, and count
+ MetaDataStateStore metaDataStateStore = new MetaDataStateStore(instance, credentials, state);
+ int count = 0;
+ for (TabletLocationState tss : metaDataStateStore) {
+ Assert.assertEquals(TabletState.HOSTED, tss.getState(state.onlineTabletServers()));
+ count++;
+ }
+ Assert.assertEquals(splits.length + 1, count);
+
+ // Create the hole
+ // Split the tablet at one end of the range
+ Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation();
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+ TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o")));
+ update(connector, m);
+
+ // do the state check
+ MergeStats stats = scan(state, metaDataStateStore);
+ MergeState newState = stats.nextMergeState(connector, state);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+
+ // unassign the tablets
+ BatchDeleter deleter = connector.createBatchDeleter("!METADATA", Authorizations.EMPTY, 1000, new BatchWriterConfig());
+ deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ deleter.setRanges(Collections.singletonList(new Range()));
+ deleter.delete();
+
+ // now we should be ready to merge but, we have an inconsistent !METADATA table
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+ // finish the split
+ KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
+ m = tablet.getPrevRowUpdateMutation();
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+ update(connector, m);
+ metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
+
+ // onos... there's a new tablet online
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state));
+
+ // chop it
+ m = tablet.getPrevRowUpdateMutation();
+ ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes()));
+ update(connector, m);
+
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+ // take it offline
+ m = tablet.getPrevRowUpdateMutation();
+ Collection<Collection<String>> walogs = Collections.emptyList();
+ metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)));
+
+ // now we can split
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state));
+
+ }
+
+ /**
+ * @param state
+ * @param metaDataStateStore
+ * @param locations
+ * @return
+ */
+ private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) {
+ MergeStats stats = new MergeStats(state.mergeInfo);
+ stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
+ for (TabletLocationState tss : metaDataStateStore) {
+ stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false);
+ }
+ return stats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java
new file mode 100644
index 0000000..d7fc619
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MergeInfoTest {
+
+ MergeInfo readWrite(MergeInfo info) throws Exception {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ info.write(buffer);
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(buffer.getData(), 0, buffer.getLength());
+ MergeInfo info2 = new MergeInfo();
+ info2.readFields(in);
+ Assert.assertEquals(info.getExtent(), info2.getExtent());
+ Assert.assertEquals(info.getState(), info2.getState());
+ Assert.assertEquals(info.getOperation(), info2.getOperation());
+ return info2;
+ }
+
+ KeyExtent ke(String tableId, String endRow, String prevEndRow) {
+ return new KeyExtent(new Text(tableId), endRow == null ? null : new Text(endRow), prevEndRow == null ? null : new Text(prevEndRow));
+ }
+
+ @Test
+ public void testWritable() throws Exception {
+ MergeInfo info;
+ info = readWrite(new MergeInfo(ke("a", null, "b"), MergeInfo.Operation.MERGE));
+ info = readWrite(new MergeInfo(ke("a", "b", null), MergeInfo.Operation.MERGE));
+ info = readWrite(new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.MERGE));
+ info = readWrite(new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.DELETE));
+ Assert.assertTrue(info.isDelete());
+ info.setState(MergeState.COMPLETE);
+ }
+
+ @Test
+ public void testNeedsToBeChopped() throws Exception {
+ MergeInfo info = new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.DELETE);
+ Assert.assertTrue(info.needsToBeChopped(ke("x", "c", "b")));
+ Assert.assertTrue(info.overlaps(ke("x", "c", "b")));
+ Assert.assertFalse(info.needsToBeChopped(ke("y", "c", "b")));
+ Assert.assertFalse(info.needsToBeChopped(ke("x", "c", "bb")));
+ Assert.assertFalse(info.needsToBeChopped(ke("x", "b", "a")));
+ info = new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.MERGE);
+ Assert.assertTrue(info.needsToBeChopped(ke("x", "c", "a")));
+ Assert.assertTrue(info.needsToBeChopped(ke("x", "aa", "a")));
+ Assert.assertTrue(info.needsToBeChopped(ke("x", null, null)));
+ Assert.assertFalse(info.needsToBeChopped(ke("x", "c", "b")));
+ Assert.assertFalse(info.needsToBeChopped(ke("y", "c", "b")));
+ Assert.assertFalse(info.needsToBeChopped(ke("x", "c", "bb")));
+ Assert.assertTrue(info.needsToBeChopped(ke("x", "b", "a")));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
new file mode 100644
index 0000000..3479d35
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStore;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class RootTabletStateStoreTest {
+
+ static class Node {
+ Node(String name) {
+ this.name = name;
+ }
+
+ List<Node> children = new ArrayList<Node>();
+ String name;
+ byte[] value = new byte[] {};
+
+ Node find(String name) {
+ for (Node node : children)
+ if (node.name.equals(name))
+ return node;
+ return null;
+ }
+ };
+
+ static class FakeZooStore implements DistributedStore {
+
+ Node root = new Node("/");
+
+ private Node recurse(Node root, String[] path, int depth) {
+ if (depth == path.length)
+ return root;
+ Node child = root.find(path[depth]);
+ if (child == null)
+ return null;
+ return recurse(child, path, depth + 1);
+ }
+
+ private Node navigate(String path) {
+ path = path.replaceAll("/$", "");
+ return recurse(root, path.split("/"), 1);
+ }
+
+ @Override
+ public List<String> getChildren(String path) throws DistributedStoreException {
+ Node node = navigate(path);
+ if (node == null)
+ return Collections.emptyList();
+ List<String> children = new ArrayList<String>(node.children.size());
+ for (Node child : node.children)
+ children.add(child.name);
+ return children;
+ }
+
+ @Override
+ public void put(String path, byte[] bs) throws DistributedStoreException {
+ create(path).value = bs;
+ }
+
+ private Node create(String path) {
+ String[] parts = path.split("/");
+ return recurseCreate(root, parts, 1);
+ }
+
+ private Node recurseCreate(Node root, String[] path, int index) {
+ if (path.length == index)
+ return root;
+ Node node = root.find(path[index]);
+ if (node == null) {
+ node = new Node(path[index]);
+ root.children.add(node);
+ }
+ return recurseCreate(node, path, index + 1);
+ }
+
+ @Override
+ public void remove(String path) throws DistributedStoreException {
+ String[] parts = path.split("/");
+ String[] parentPath = Arrays.copyOf(parts, parts.length - 1);
+ Node parent = recurse(root, parentPath, 1);
+ if (parent == null)
+ return;
+ Node child = parent.find(parts[parts.length - 1]);
+ if (child != null)
+ parent.children.remove(child);
+ }
+
+ @Override
+ public byte[] get(String path) throws DistributedStoreException {
+ Node node = navigate(path);
+ if (node != null)
+ return node.value;
+ return null;
+ }
+ }
+
+ @Test
+ public void testFakeZoo() throws DistributedStoreException {
+ DistributedStore store = new FakeZooStore();
+ store.put("/a/b/c", "abc".getBytes());
+ byte[] abc = store.get("/a/b/c");
+ assertArrayEquals(abc, "abc".getBytes());
+ byte[] empty = store.get("/a/b");
+ assertArrayEquals(empty, "".getBytes());
+ store.put("/a/b", "ab".getBytes());
+ assertArrayEquals(store.get("/a/b"), "ab".getBytes());
+ store.put("/a/b/b", "abb".getBytes());
+ List<String> children = store.getChildren("/a/b");
+ assertEquals(new HashSet<String>(children), new HashSet<String>(Arrays.asList("b", "c")));
+ store.remove("/a/b/c");
+ children = store.getChildren("/a/b");
+ assertEquals(new HashSet<String>(children), new HashSet<String>(Arrays.asList("b")));
+ }
+
+ @Test
+ public void testRootTabletStateStore() throws DistributedStoreException {
+ ZooTabletStateStore tstore = new ZooTabletStateStore(new FakeZooStore());
+ KeyExtent root = RootTable.EXTENT;
+ String sessionId = "this is my unique session data";
+ TServerInstance server = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 10000), sessionId);
+ List<Assignment> assignments = Collections.singletonList(new Assignment(root, server));
+ tstore.setFutureLocations(assignments);
+ int count = 0;
+ for (TabletLocationState location : tstore) {
+ assertEquals(location.extent, root);
+ assertEquals(location.future, server);
+ assertNull(location.current);
+ count++;
+ }
+ assertEquals(count, 1);
+ tstore.setLocations(assignments);
+ count = 0;
+ for (TabletLocationState location : tstore) {
+ assertEquals(location.extent, root);
+ assertNull(location.future);
+ assertEquals(location.current, server);
+ count++;
+ }
+ assertEquals(count, 1);
+ TabletLocationState assigned = null;
+ try {
+ assigned = new TabletLocationState(root, server, null, null, null, false);
+ } catch (BadLocationStateException e) {
+ fail("Unexpected error " + e);
+ }
+ tstore.unassign(Collections.singletonList(assigned));
+ count = 0;
+ for (TabletLocationState location : tstore) {
+ assertEquals(location.extent, root);
+ assertNull(location.future);
+ assertNull(location.current);
+ count++;
+ }
+ assertEquals(count, 1);
+
+ KeyExtent notRoot = new KeyExtent(new Text("0"), null, null);
+ try {
+ tstore.setLocations(Collections.singletonList(new Assignment(notRoot, server)));
+ Assert.fail("should not get here");
+ } catch (IllegalArgumentException ex) {}
+
+ try {
+ tstore.setFutureLocations(Collections.singletonList(new Assignment(notRoot, server)));
+ Assert.fail("should not get here");
+ } catch (IllegalArgumentException ex) {}
+
+ TabletLocationState broken = null;
+ try {
+ broken = new TabletLocationState(notRoot, server, null, null, null, false);
+ } catch (BadLocationStateException e) {
+ fail("Unexpected error " + e);
+ }
+ try {
+ tstore.unassign(Collections.singletonList(broken));
+ Assert.fail("should not get here");
+ } catch (IllegalArgumentException ex) {}
+ }
+
+ // @Test
+ // public void testMetaDataStore() { } // see functional test
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/pom.xml
----------------------------------------------------------------------
diff --git a/server/monitor/pom.xml b/server/monitor/pom.xml
new file mode 100644
index 0000000..6f6a147
--- /dev/null
+++ b/server/monitor/pom.xml
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-project</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <artifactId>accumulo-monitor</artifactId>
+ <name>Monitor Server</name>
+ <dependencies>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-fate</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-server-base</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-start</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-trace</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/resources/web/flot/**/*.js</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java
new file mode 100644
index 0000000..e16b598
--- /dev/null
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor;
+
+import javax.servlet.http.HttpServlet;
+
+import org.apache.accumulo.core.conf.Property;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.security.SslSocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.SessionHandler;
+
+public class EmbeddedWebServer {
+
+ Server server = null;
+ SocketConnector sock;
+ ContextHandlerCollection handler;
+ Context root;
+ boolean usingSsl;
+
+ public EmbeddedWebServer() {
+ this("0.0.0.0", 0);
+ }
+
+ public EmbeddedWebServer(String host, int port) {
+ server = new Server();
+ handler = new ContextHandlerCollection();
+ root = new Context(handler, "/", new SessionHandler(), null, null, null);
+
+ if (Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTORE) == ""
+ || Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTOREPASS) == ""
+ || Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTORE) == ""
+ || Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTOREPASS) == "") {
+ sock = new SocketConnector();
+ usingSsl = false;
+ } else {
+ sock = new SslSocketConnector();
+ ((SslSocketConnector) sock).setKeystore(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTORE));
+ ((SslSocketConnector) sock).setKeyPassword(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTOREPASS));
+ ((SslSocketConnector) sock).setTruststore(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTORE));
+ ((SslSocketConnector) sock).setTrustPassword(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTOREPASS));
+ usingSsl = true;
+ }
+ sock.setHost(host);
+ sock.setPort(port);
+ }
+
+ public void addServlet(Class<? extends HttpServlet> klass, String where) {
+ root.addServlet(klass, where);
+ }
+
+ public int getPort() {
+ return sock.getLocalPort();
+ }
+
+ public void start() {
+ try {
+ server.addConnector(sock);
+ server.setHandler(handler);
+ server.start();
+ } catch (Exception e) {
+ stop();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void stop() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean isUsingSsl() {
+ return usingSsl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
new file mode 100644
index 0000000..aaec7e4
--- /dev/null
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.gc.thrift.GCMonitorService;
+import org.apache.accumulo.core.gc.thrift.GCStatus;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.monitor.servlets.DefaultServlet;
+import org.apache.accumulo.monitor.servlets.GcStatusServlet;
+import org.apache.accumulo.monitor.servlets.JSONServlet;
+import org.apache.accumulo.monitor.servlets.LogServlet;
+import org.apache.accumulo.monitor.servlets.MasterServlet;
+import org.apache.accumulo.monitor.servlets.OperationServlet;
+import org.apache.accumulo.monitor.servlets.ProblemServlet;
+import org.apache.accumulo.monitor.servlets.ShellServlet;
+import org.apache.accumulo.monitor.servlets.TServersServlet;
+import org.apache.accumulo.monitor.servlets.TablesServlet;
+import org.apache.accumulo.monitor.servlets.VisServlet;
+import org.apache.accumulo.monitor.servlets.XMLServlet;
+import org.apache.accumulo.monitor.servlets.trace.ListType;
+import org.apache.accumulo.monitor.servlets.trace.ShowTrace;
+import org.apache.accumulo.monitor.servlets.trace.Summary;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.monitor.LogService;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.util.TableInfoUtil;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Serve master statistics with an embedded web server.
+ */
+public class Monitor {
+ private static final Logger log = Logger.getLogger(Monitor.class);
+
+ private static final int REFRESH_TIME = 5;
+ private static long lastRecalc = 0L;
+ private static double totalIngestRate = 0.0;
+ private static double totalIngestByteRate = 0.0;
+ private static double totalQueryRate = 0.0;
+ private static double totalScanRate = 0.0;
+ private static double totalQueryByteRate = 0.0;
+ private static long totalEntries = 0L;
+ private static int totalTabletCount = 0;
+ private static int onlineTabletCount = 0;
+ private static long totalHoldTime = 0;
+ private static long totalLookups = 0;
+ private static int totalTables = 0;
+
+ private static class MaxList<T> extends LinkedList<Pair<Long,T>> {
+ private static final long serialVersionUID = 1L;
+
+ private long maxDelta;
+
+ public MaxList(long maxDelta) {
+ this.maxDelta = maxDelta;
+ }
+
+ @Override
+ public boolean add(Pair<Long,T> obj) {
+ boolean result = super.add(obj);
+
+ if (obj.getFirst() - get(0).getFirst() > maxDelta)
+ remove(0);
+
+ return result;
+ }
+
+ }
+
+ private static final int MAX_TIME_PERIOD = 60 * 60 * 1000;
+ private static final List<Pair<Long,Double>> loadOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Double>> ingestRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Double>> ingestByteRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Integer>> recoveriesOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Integer>> minorCompactionsOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Integer>> majorCompactionsOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Double>> lookupsOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Integer>> queryRateOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Integer>> scanRateOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Double>> queryByteRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Double>> indexCacheHitRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
+ private static final List<Pair<Long,Double>> dataCacheHitRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
+ private static EventCounter lookupRateTracker = new EventCounter();
+ private static EventCounter indexCacheHitTracker = new EventCounter();
+ private static EventCounter indexCacheRequestTracker = new EventCounter();
+ private static EventCounter dataCacheHitTracker = new EventCounter();
+ private static EventCounter dataCacheRequestTracker = new EventCounter();
+
+ private static volatile boolean fetching = false;
+ private static MasterMonitorInfo mmi;
+ private static Map<String,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap();
+ private static Exception problemException;
+ private static GCStatus gcStatus;
+
+ private static Instance instance;
+
+ private static ServerConfiguration config;
+
+ private static EmbeddedWebServer server;
+
+ private static class EventCounter {
+
+ Map<String,Pair<Long,Long>> prevSamples = new HashMap<String,Pair<Long,Long>>();
+ Map<String,Pair<Long,Long>> samples = new HashMap<String,Pair<Long,Long>>();
+ Set<String> serversUpdated = new HashSet<String>();
+
+ void startingUpdates() {
+ serversUpdated.clear();
+ }
+
+ void updateTabletServer(String name, long sampleTime, long numEvents) {
+ Pair<Long,Long> newSample = new Pair<Long,Long>(sampleTime, numEvents);
+ Pair<Long,Long> lastSample = samples.get(name);
+
+ if (lastSample == null || !lastSample.equals(newSample)) {
+ samples.put(name, newSample);
+ if (lastSample != null) {
+ prevSamples.put(name, lastSample);
+ }
+ }
+ serversUpdated.add(name);
+ }
+
+ void finishedUpdating() {
+ // remove any tablet servers not updated
+ samples.keySet().retainAll(serversUpdated);
+ prevSamples.keySet().retainAll(serversUpdated);
+ }
+
+ double calculateRate() {
+ double totalRate = 0;
+
+ for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) {
+ Pair<Long,Long> prevSample = entry.getValue();
+ Pair<Long,Long> sample = samples.get(entry.getKey());
+
+ totalRate += (sample.getSecond() - prevSample.getSecond()) / ((sample.getFirst() - prevSample.getFirst()) / (double) 1000);
+ }
+
+ return totalRate;
+ }
+
+ long calculateCount() {
+ long count = 0;
+
+ for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) {
+ Pair<Long,Long> prevSample = entry.getValue();
+ Pair<Long,Long> sample = samples.get(entry.getKey());
+
+ count += sample.getSecond() - prevSample.getSecond();
+ }
+
+ return count;
+ }
+ }
+
+ public static void fetchData() {
+ double totalIngestRate = 0.;
+ double totalIngestByteRate = 0.;
+ double totalQueryRate = 0.;
+ double totalQueryByteRate = 0.;
+ double totalScanRate = 0.;
+ long totalEntries = 0;
+ int totalTabletCount = 0;
+ int onlineTabletCount = 0;
+ long totalHoldTime = 0;
+ long totalLookups = 0;
+ boolean retry = true;
+
+ // only recalc every so often
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastRecalc < REFRESH_TIME * 1000)
+ return;
+
+ synchronized (Monitor.class) {
+ if (fetching)
+ return;
+ fetching = true;
+ }
+
+ try {
+ while (retry) {
+ MasterClientService.Iface client = null;
+ try {
+ client = MasterClient.getConnection(HdfsZooInstance.getInstance());
+ if (client != null) {
+ mmi = client.getMasterStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()));
+ retry = false;
+ } else {
+ mmi = null;
+ }
+ Monitor.gcStatus = fetchGcStatus();
+ } catch (Exception e) {
+ mmi = null;
+ log.info("Error fetching stats: " + e);
+ } finally {
+ if (client != null) {
+ MasterClient.close(client);
+ }
+ }
+ if (mmi == null)
+ UtilWaitThread.sleep(1000);
+ }
+ if (mmi != null) {
+ int majorCompactions = 0;
+ int minorCompactions = 0;
+
+ lookupRateTracker.startingUpdates();
+ indexCacheHitTracker.startingUpdates();
+ indexCacheRequestTracker.startingUpdates();
+ dataCacheHitTracker.startingUpdates();
+ dataCacheRequestTracker.startingUpdates();
+
+ for (TabletServerStatus server : mmi.tServerInfo) {
+ TableInfo summary = TableInfoUtil.summarizeTableStats(server);
+ totalIngestRate += summary.ingestRate;
+ totalIngestByteRate += summary.ingestByteRate;
+ totalQueryRate += summary.queryRate;
+ totalScanRate += summary.scanRate;
+ totalQueryByteRate += summary.queryByteRate;
+ totalEntries += summary.recs;
+ totalHoldTime += server.holdTime;
+ totalLookups += server.lookups;
+ majorCompactions += summary.majors.running;
+ minorCompactions += summary.minors.running;
+ lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
+ indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
+ indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
+ dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
+ dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
+ }
+
+ lookupRateTracker.finishedUpdating();
+ indexCacheHitTracker.finishedUpdating();
+ indexCacheRequestTracker.finishedUpdating();
+ dataCacheHitTracker.finishedUpdating();
+ dataCacheRequestTracker.finishedUpdating();
+
+ int totalTables = 0;
+ for (TableInfo tInfo : mmi.tableMap.values()) {
+ totalTabletCount += tInfo.tablets;
+ onlineTabletCount += tInfo.onlineTablets;
+ totalTables++;
+ }
+ Monitor.totalIngestRate = totalIngestRate;
+ Monitor.totalTables = totalTables;
+ totalIngestByteRate = totalIngestByteRate / 1000000.0;
+ Monitor.totalIngestByteRate = totalIngestByteRate;
+ Monitor.totalQueryRate = totalQueryRate;
+ Monitor.totalScanRate = totalScanRate;
+ totalQueryByteRate = totalQueryByteRate / 1000000.0;
+ Monitor.totalQueryByteRate = totalQueryByteRate;
+ Monitor.totalEntries = totalEntries;
+ Monitor.totalTabletCount = totalTabletCount;
+ Monitor.onlineTabletCount = onlineTabletCount;
+ Monitor.totalHoldTime = totalHoldTime;
+ Monitor.totalLookups = totalLookups;
+
+ ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
+ ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
+
+ double totalLoad = 0.;
+ for (TabletServerStatus status : mmi.tServerInfo) {
+ if (status != null)
+ totalLoad += status.osLoad;
+ }
+ loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
+
+ minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
+ majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
+
+ lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
+
+ queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
+ queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
+
+ scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
+
+ calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
+ calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
+ }
+ try {
+ Monitor.problemSummary = ProblemReports.getInstance().summarize();
+ Monitor.problemException = null;
+ } catch (Exception e) {
+ log.info("Failed to obtain problem reports ", e);
+ Monitor.problemSummary = Collections.emptyMap();
+ Monitor.problemException = e;
+ }
+
+ } finally {
+ synchronized (Monitor.class) {
+ fetching = false;
+ lastRecalc = currentTime;
+ }
+ }
+ }
+
+ private static void calcCacheHitRate(List<Pair<Long,Double>> hitRate, long currentTime, EventCounter cacheHits, EventCounter cacheReq) {
+ long req = cacheReq.calculateCount();
+ if (req > 0)
+ hitRate.add(new Pair<Long,Double>(currentTime, cacheHits.calculateCount() / (double) cacheReq.calculateCount()));
+ else
+ hitRate.add(new Pair<Long,Double>(currentTime, null));
+ }
+
+ private static GCStatus fetchGcStatus() {
+ GCStatus result = null;
+ HostAndPort address = null;
+ try {
+ // Read the gc location from its lock
+ Instance instance = HdfsZooInstance.getInstance();
+ String zooKeepers = instance.getZooKeepers();
+ log.debug("connecting to zookeepers " + zooKeepers);
+ ZooKeeper zk = new ZooKeeper(zooKeepers, (int) config.getConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {}
+ });
+ try {
+ String path = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZGC_LOCK;
+ List<String> locks = zk.getChildren(path, null);
+ if (locks != null && locks.size() > 0) {
+ Collections.sort(locks);
+ address = new ServerServices(new String(zk.getData(path + "/" + locks.get(0), null, null))).getAddress(Service.GC_CLIENT);
+ GCMonitorService.Client client = ThriftUtil.getClient(new GCMonitorService.Client.Factory(), address, config.getConfiguration());
+ try {
+ result = client.getStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+ } finally {
+ zk.close();
+ }
+ } catch (Exception ex) {
+ log.warn("Unable to contact the garbage collector at " + address, ex);
+ }
+ return result;
+ }
+
+ public static void main(String[] args) throws Exception {
+ SecurityUtil.serverLogin();
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ ServerOpts opts = new ServerOpts();
+ opts.parseArgs("monitor", args);
+ String hostname = opts.getAddress();
+ instance = HdfsZooInstance.getInstance();
+ config = new ServerConfiguration(instance);
+ Accumulo.init(fs, config, "monitor");
+ Monitor monitor = new Monitor();
+ Accumulo.enableTracing(hostname, "monitor");
+ monitor.run(hostname);
+ }
+
+ private static long START_TIME;
+
+ public void run(String hostname) {
+ Monitor.START_TIME = System.currentTimeMillis();
+ int port = config.getConfiguration().getPort(Property.MONITOR_PORT);
+ try {
+ log.debug("Creating monitor on port " + port);
+ server = new EmbeddedWebServer(hostname, port);
+ } catch (Throwable ex) {
+ log.error("Unable to start embedded web server", ex);
+ throw new RuntimeException(ex);
+ }
+
+ server.addServlet(DefaultServlet.class, "/");
+ server.addServlet(OperationServlet.class, "/op");
+ server.addServlet(MasterServlet.class, "/master");
+ server.addServlet(TablesServlet.class, "/tables");
+ server.addServlet(TServersServlet.class, "/tservers");
+ server.addServlet(ProblemServlet.class, "/problems");
+ server.addServlet(GcStatusServlet.class, "/gc");
+ server.addServlet(LogServlet.class, "/log");
+ server.addServlet(XMLServlet.class, "/xml");
+ server.addServlet(JSONServlet.class, "/json");
+ server.addServlet(VisServlet.class, "/vis");
+ server.addServlet(Summary.class, "/trace/summary");
+ server.addServlet(ListType.class, "/trace/listType");
+ server.addServlet(ShowTrace.class, "/trace/show");
+ if (server.isUsingSsl())
+ server.addServlet(ShellServlet.class, "/shell");
+ server.start();
+
+ try {
+ String monitorAddress = HostAndPort.fromParts(hostname, server.getPort()).toString();
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, monitorAddress.getBytes(),
+ NodeExistsPolicy.OVERWRITE);
+ log.info("Set monitor address in zookeeper to " + monitorAddress);
+ } catch (Exception ex) {
+ log.error("Unable to set monitor address in zookeeper");
+ }
+ LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID());
+
+ new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start();
+
+ // need to regularly fetch data so plot data is updated
+ new Daemon(new LoggingRunnable(log, new Runnable() {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Monitor.fetchData();
+ } catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ }
+
+ UtilWaitThread.sleep(333);
+ }
+
+ }
+ }), "Data fetcher").start();
+ }
+
+ public static MasterMonitorInfo getMmi() {
+ return mmi;
+ }
+
+ public static int getTotalTables() {
+ return totalTables;
+ }
+
+ public static int getTotalTabletCount() {
+ return totalTabletCount;
+ }
+
+ public static int getOnlineTabletCount() {
+ return onlineTabletCount;
+ }
+
+ public static long getTotalEntries() {
+ return totalEntries;
+ }
+
+ public static double getTotalIngestRate() {
+ return totalIngestRate;
+ }
+
+ public static double getTotalIngestByteRate() {
+ return totalIngestByteRate;
+ }
+
+ public static double getTotalQueryRate() {
+ return totalQueryRate;
+ }
+
+ public static double getTotalScanRate() {
+ return totalScanRate;
+ }
+
+ public static double getTotalQueryByteRate() {
+ return totalQueryByteRate;
+ }
+
+ public static long getTotalHoldTime() {
+ return totalHoldTime;
+ }
+
+ public static Exception getProblemException() {
+ return problemException;
+ }
+
+ public static Map<String,Map<ProblemType,Integer>> getProblemSummary() {
+ return problemSummary;
+ }
+
+ public static GCStatus getGcStatus() {
+ return gcStatus;
+ }
+
+ public static long getTotalLookups() {
+ return totalLookups;
+ }
+
+ public static long getStartTime() {
+ return START_TIME;
+ }
+
+ public static List<Pair<Long,Double>> getLoadOverTime() {
+ synchronized (loadOverTime) {
+ return new ArrayList<Pair<Long,Double>>(loadOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Double>> getIngestRateOverTime() {
+ synchronized (ingestRateOverTime) {
+ return new ArrayList<Pair<Long,Double>>(ingestRateOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Double>> getIngestByteRateOverTime() {
+ synchronized (ingestByteRateOverTime) {
+ return new ArrayList<Pair<Long,Double>>(ingestByteRateOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Integer>> getRecoveriesOverTime() {
+ synchronized (recoveriesOverTime) {
+ return new ArrayList<Pair<Long,Integer>>(recoveriesOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Integer>> getMinorCompactionsOverTime() {
+ synchronized (minorCompactionsOverTime) {
+ return new ArrayList<Pair<Long,Integer>>(minorCompactionsOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Integer>> getMajorCompactionsOverTime() {
+ synchronized (majorCompactionsOverTime) {
+ return new ArrayList<Pair<Long,Integer>>(majorCompactionsOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Double>> getLookupsOverTime() {
+ synchronized (lookupsOverTime) {
+ return new ArrayList<Pair<Long,Double>>(lookupsOverTime);
+ }
+ }
+
+ public static double getLookupRate() {
+ return lookupRateTracker.calculateRate();
+ }
+
+ public static List<Pair<Long,Integer>> getQueryRateOverTime() {
+ synchronized (queryRateOverTime) {
+ return new ArrayList<Pair<Long,Integer>>(queryRateOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Integer>> getScanRateOverTime() {
+ synchronized (scanRateOverTime) {
+ return new ArrayList<Pair<Long,Integer>>(scanRateOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Double>> getQueryByteRateOverTime() {
+ synchronized (queryByteRateOverTime) {
+ return new ArrayList<Pair<Long,Double>>(queryByteRateOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Double>> getIndexCacheHitRateOverTime() {
+ synchronized (indexCacheHitRateOverTime) {
+ return new ArrayList<Pair<Long,Double>>(indexCacheHitRateOverTime);
+ }
+ }
+
+ public static List<Pair<Long,Double>> getDataCacheHitRateOverTime() {
+ synchronized (dataCacheHitRateOverTime) {
+ return new ArrayList<Pair<Long,Double>>(dataCacheHitRateOverTime);
+ }
+ }
+
+ public static AccumuloConfiguration getSystemConfiguration() {
+ return config.getConfiguration();
+ }
+
+ public static Instance getInstance() {
+ return instance;
+ }
+
+ public static boolean isUsingSsl() {
+ return server.isUsingSsl();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java
new file mode 100644
index 0000000..a4a8911
--- /dev/null
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor;
+
+import java.util.Collection;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.TTimeoutTransport;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.net.HostAndPort;
+
+public class ZooKeeperStatus implements Runnable {
+
+ private static final Logger log = Logger.getLogger(ZooKeeperStatus.class);
+
+ private volatile boolean stop = false;
+
+ public static class ZooKeeperState implements Comparable<ZooKeeperState> {
+ public final String keeper;
+ public final String mode;
+ public final int clients;
+
+ public ZooKeeperState(String keeper, String mode, int clients) {
+ this.keeper = keeper;
+ this.mode = mode;
+ this.clients = clients;
+ }
+
+ @Override
+ public int compareTo(ZooKeeperState other) {
+ if (this == other) {
+ return 0;
+ } else if (other == null) {
+ return 1;
+ } else {
+ if (this.keeper == other.keeper) {
+ return 0;
+ } else if (null == this.keeper) {
+ return -1;
+ } else if (null == other.keeper) {
+ return 1;
+ } else {
+ return this.keeper.compareTo(other.keeper);
+ }
+ }
+ }
+ }
+
+ private static SortedSet<ZooKeeperState> status = new TreeSet<ZooKeeperState>();
+
+ public static Collection<ZooKeeperState> getZooKeeperStatus() {
+ return status;
+ }
+
+ public void stop() {
+ this.stop = true;
+ }
+
+ @Override
+ public void run() {
+
+ while (!stop) {
+
+ TreeSet<ZooKeeperState> update = new TreeSet<ZooKeeperState>();
+
+ String zookeepers[] = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST).split(",");
+ for (String keeper : zookeepers) {
+ int clients = 0;
+ String mode = "unknown";
+
+ String[] parts = keeper.split(":");
+ TTransport transport = null;
+ try {
+ HostAndPort addr;
+ if (parts.length > 1)
+ addr = HostAndPort.fromParts(parts[0], Integer.parseInt(parts[1]));
+ else
+ addr = HostAndPort.fromParts(parts[0], 2181);
+
+ transport = TTimeoutTransport.create(addr, 10 * 1000l);
+ transport.write("stat\n".getBytes(), 0, 5);
+ StringBuilder response = new StringBuilder();
+ try {
+ transport.flush();
+ byte[] buffer = new byte[1024 * 100];
+ int n = 0;
+ while ((n = transport.read(buffer, 0, buffer.length)) > 0) {
+ response.append(new String(buffer, 0, n));
+ }
+ } catch (TTransportException ex) {
+ // happens at EOF
+ }
+ for (String line : response.toString().split("\n")) {
+ if (line.startsWith(" "))
+ clients++;
+ if (line.startsWith("Mode"))
+ mode = line.split(":")[1];
+ }
+ update.add(new ZooKeeperState(keeper, mode, clients));
+ } catch (Exception ex) {
+ log.info("Exception talking to zookeeper " + keeper, ex);
+ update.add(new ZooKeeperState(keeper, "Down", -1));
+ } finally {
+ if (transport != null) {
+ try {
+ transport.close();
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+ }
+ }
+ status = update;
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+}