You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:23 UTC
[44/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java
new file mode 100644
index 0000000..f03b04b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+
+/**
+ * A MemoryManager in accumulo currently determines when minor compactions should occur and when ingest should be put on hold. The goal of a memory manager
+ * implementation is to maximize ingest throughput and minimize the number of minor compactions.
+ *
+ *
+ *
+ */
+
+public interface MemoryManager {
+
+ /**
+ * Initialize the memory manager.
+ *
+ * @param conf
+ */
+ void init(ServerConfiguration conf);
+
+ /**
+ * An implementation of this function will be called periodically by accumulo and should return a list of tablets to minor compact.
+ *
+ * Instructing a tablet that is already minor compacting (this can be inferred from the TabletState) to minor compact has no effect.
+ *
+ * Holding all ingest does not affect metadata tablets.
+ */
+
+ MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets);
+
+ /**
+ * This method is called when a tablet is closed. A memory manger can clean up any per tablet state it is keeping when this is called.
+ */
+ void tabletClosed(KeyExtent extent);
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
new file mode 100644
index 0000000..aeacb8d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public interface TabletState {
+ KeyExtent getExtent();
+
+ long getLastCommitTime();
+
+ long getMemTableSize();
+
+ long getMinorCompactingMemTableSize();
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java
new file mode 100644
index 0000000..5162e01
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.trace;
+
+import java.io.IOException;
+
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+
+public class TraceFSDataInputStream extends FSDataInputStream {
+ @Override
+ public synchronized void seek(long desired) throws IOException {
+ Span span = Trace.start("FSDataInputStream.seek");
+ try {
+ impl.seek(desired);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ Span span = Trace.start("FSDataInputStream.read");
+ if (Trace.isTracing())
+ span.data("length", Integer.toString(length));
+ try {
+ return impl.read(position, buffer, offset, length);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ Span span = Trace.start("FSDataInputStream.readFully");
+ if (Trace.isTracing())
+ span.data("length", Integer.toString(length));
+ try {
+ impl.readFully(position, buffer, offset, length);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ Span span = Trace.start("FSDataInputStream.readFully");
+ if (Trace.isTracing())
+ span.data("length", Integer.toString(buffer.length));
+ try {
+ impl.readFully(position, buffer);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ Span span = Trace.start("FSDataInputStream.seekToNewSource");
+ try {
+ return impl.seekToNewSource(targetPos);
+ } finally {
+ span.stop();
+ }
+ }
+
+ private final FSDataInputStream impl;
+
+ public TraceFSDataInputStream(FSDataInputStream in) throws IOException {
+ super(in);
+ impl = in;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
new file mode 100644
index 0000000..71cc562
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.trace;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+// If FileSystem was an interface, we could use a Proxy, but it's not, so we have to override everything manually
+
+public class TraceFileSystem extends FileSystem {
+
+ @Override
+ public void setConf(Configuration conf) {
+ Span span = Trace.start("setConf");
+ try {
+ if (impl != null)
+ impl.setConf(conf);
+ else
+ super.setConf(conf);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ Span span = Trace.start("getConf");
+ try {
+ return impl.getConf();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ Span span = Trace.start("getFileBlockLocations");
+ try {
+ return impl.getFileBlockLocations(file, start, len);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ Span span = Trace.start("open");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return new TraceFSDataInputStream(impl.open(f));
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f, overwrite);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+
+ return impl.create(f, progress);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, short replication) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f, replication);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f, replication, progress);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f, overwrite, bufferSize);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f, overwrite, bufferSize, progress);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f, overwrite, bufferSize, replication, blockSize);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+ Span span = Trace.start("create");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.create(f, overwrite, bufferSize, replication, blockSize, progress);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean createNewFile(Path f) throws IOException {
+ Span span = Trace.start("createNewFile");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.createNewFile(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f) throws IOException {
+ Span span = Trace.start("append");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.append(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+ Span span = Trace.start("append");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.append(f, bufferSize);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Deprecated
+ @Override
+ public short getReplication(Path src) throws IOException {
+ Span span = Trace.start("getReplication");
+ if (Trace.isTracing())
+ span.data("path", src.toString());
+ try {
+ return impl.getFileStatus(src).getReplication();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean setReplication(Path src, short replication) throws IOException {
+ Span span = Trace.start("setReplication");
+ if (Trace.isTracing())
+ span.data("path", src.toString());
+ try {
+ return impl.setReplication(src, replication);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ Span span = Trace.start("exists");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.exists(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Deprecated
+ @Override
+ public boolean isDirectory(Path f) throws IOException {
+ Span span = Trace.start("isDirectory");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.getFileStatus(f).isDir();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean isFile(Path f) throws IOException {
+ Span span = Trace.start("isFile");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.isFile(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public long getLength(Path f) throws IOException {
+ Span span = Trace.start("getLength");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.getLength(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ Span span = Trace.start("getContentSummary");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.getContentSummary(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
+ Span span = Trace.start("listStatus");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.listStatus(f, filter);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path[] files) throws IOException {
+ Span span = Trace.start("listStatus");
+ try {
+ return impl.listStatus(files);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
+ Span span = Trace.start("listStatus");
+ try {
+ return impl.listStatus(files, filter);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ Span span = Trace.start("globStatus");
+ if (Trace.isTracing())
+ span.data("pattern", pathPattern.toString());
+ try {
+ return impl.globStatus(pathPattern);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
+ Span span = Trace.start("globStatus");
+ if (Trace.isTracing())
+ span.data("pattern", pathPattern.toString());
+ try {
+ return impl.globStatus(pathPattern, filter);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ Span span = Trace.start("getHomeDirectory");
+ try {
+ return impl.getHomeDirectory();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean mkdirs(Path f) throws IOException {
+ Span span = Trace.start("mkdirs");
+ if (Trace.isTracing())
+ span.data("path", f.toString());
+ try {
+ return impl.mkdirs(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void copyFromLocalFile(Path src, Path dst) throws IOException {
+ Span span = Trace.start("copyFromLocalFile");
+ if (Trace.isTracing()) {
+ span.data("src", src.toString());
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.copyFromLocalFile(src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException {
+ Span span = Trace.start("moveFromLocalFile");
+ if (Trace.isTracing()) {
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.moveFromLocalFile(srcs, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void moveFromLocalFile(Path src, Path dst) throws IOException {
+ Span span = Trace.start("moveFromLocalFile");
+ if (Trace.isTracing()) {
+ span.data("src", src.toString());
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.moveFromLocalFile(src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+ Span span = Trace.start("copyFromLocalFile");
+ if (Trace.isTracing()) {
+ span.data("src", src.toString());
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.copyFromLocalFile(delSrc, src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst) throws IOException {
+ Span span = Trace.start("copyFromLocalFile");
+ if (Trace.isTracing()) {
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.copyFromLocalFile(delSrc, overwrite, srcs, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException {
+ Span span = Trace.start("copyFromLocalFile");
+ if (Trace.isTracing()) {
+ span.data("src", src.toString());
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.copyFromLocalFile(delSrc, overwrite, src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void copyToLocalFile(Path src, Path dst) throws IOException {
+ Span span = Trace.start("copyFromLocalFile");
+ if (Trace.isTracing()) {
+ span.data("src", src.toString());
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.copyToLocalFile(src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void moveToLocalFile(Path src, Path dst) throws IOException {
+ Span span = Trace.start("moveToLocalFile");
+ if (Trace.isTracing()) {
+ span.data("src", src.toString());
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.moveToLocalFile(src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+ Span span = Trace.start("copyToLocalFile");
+ if (Trace.isTracing()) {
+ span.data("src", src.toString());
+ span.data("dst", dst.toString());
+ }
+ try {
+ impl.copyToLocalFile(delSrc, src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
+ Span span = Trace.start("startLocalOutput");
+ if (Trace.isTracing()) {
+ span.data("out", fsOutputFile.toString());
+ span.data("local", tmpLocalFile.toString());
+ }
+ try {
+ return impl.startLocalOutput(fsOutputFile, tmpLocalFile);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
+ Span span = Trace.start("completeLocalOutput");
+ if (Trace.isTracing()) {
+ span.data("out", fsOutputFile.toString());
+ span.data("local", tmpLocalFile.toString());
+ }
+ try {
+ impl.completeLocalOutput(fsOutputFile, tmpLocalFile);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ Span span = Trace.start("close");
+ try {
+ impl.close();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public long getUsed() throws IOException {
+ Span span = Trace.start("getUsed");
+ try {
+ return impl.getUsed();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public long getBlockSize(Path f) throws IOException {
+ Span span = Trace.start("getBlockSize");
+ if (Trace.isTracing()) {
+ span.data("path", f.toString());
+ }
+ try {
+ return impl.getBlockSize(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Deprecated
+ @Override
+ public long getDefaultBlockSize() {
+ Span span = Trace.start("getDefaultBlockSize");
+ try {
+ return impl.getDefaultBlockSize();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Deprecated
+ @Override
+ public short getDefaultReplication() {
+ Span span = Trace.start("getDefaultReplication");
+ try {
+ return impl.getDefaultReplication();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ Span span = Trace.start("getFileChecksum");
+ if (Trace.isTracing()) {
+ span.data("path", f.toString());
+ }
+ try {
+ return impl.getFileChecksum(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void setVerifyChecksum(boolean verifyChecksum) {
+ Span span = Trace.start("setVerifyChecksum");
+ try {
+ impl.setVerifyChecksum(verifyChecksum);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void setPermission(Path p, FsPermission permission) throws IOException {
+ Span span = Trace.start("setPermission");
+ if (Trace.isTracing()) {
+ span.data("path", p.toString());
+ }
+ try {
+ impl.setPermission(p, permission);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void setOwner(Path p, String username, String groupname) throws IOException {
+ Span span = Trace.start("setOwner");
+ if (Trace.isTracing()) {
+ span.data("path", p.toString());
+ span.data("user", username);
+ span.data("group", groupname);
+ }
+
+ try {
+ impl.setOwner(p, username, groupname);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
+ Span span = Trace.start("setTimes");
+ try {
+ impl.setTimes(p, mtime, atime);
+ } finally {
+ span.stop();
+ }
+ }
+
+ final FileSystem impl;
+
+ TraceFileSystem(FileSystem impl) {
+ ArgumentChecker.notNull(impl);
+ this.impl = impl;
+ }
+
+ public FileSystem getImplementation() {
+ return impl;
+ }
+
+ @Override
+ public URI getUri() {
+ Span span = Trace.start("getUri");
+ try {
+ return impl.getUri();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ Span span = Trace.start("open");
+ try {
+ return new TraceFSDataInputStream(impl.open(f, bufferSize));
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
+ throws IOException {
+ Span span = Trace.start("create");
+ try {
+ return impl.create(f, overwrite, bufferSize, replication, blockSize, progress);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ Span span = Trace.start("initialize");
+ try {
+ impl.initialize(name, conf);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ Span span = Trace.start("append");
+ try {
+ return impl.append(f, bufferSize, progress);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ Span span = Trace.start("rename");
+ try {
+ return impl.rename(src, dst);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean delete(Path f) throws IOException {
+ Span span = Trace.start("delete");
+ try {
+ return impl.delete(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ Span span = Trace.start("delete");
+ try {
+ return impl.delete(f, recursive);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ Span span = Trace.start("listStatus");
+ try {
+ return impl.listStatus(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+ Span span = Trace.start("setWorkingDirectory");
+ try {
+ impl.setWorkingDirectory(new_dir);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ Span span = Trace.start("getWorkingDirectory");
+ try {
+ return impl.getWorkingDirectory();
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ Span span = Trace.start("mkdirs");
+ try {
+ return impl.mkdirs(f, permission);
+ } finally {
+ span.stop();
+ }
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ Span span = Trace.start("getFileStatus");
+ try {
+ return impl.getFileStatus(f);
+ } finally {
+ span.stop();
+ }
+ }
+
+ public static FileSystem wrap(FileSystem fileSystem) {
+ return new TraceFileSystem(fileSystem);
+ }
+
+ public static FileSystem getAndWrap(Configuration conf) throws IOException {
+ return wrap(FileSystem.get(conf));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/ActionStatsUpdator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ActionStatsUpdator.java b/server/base/src/main/java/org/apache/accumulo/server/util/ActionStatsUpdator.java
new file mode 100644
index 0000000..dd4573b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ActionStatsUpdator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import org.apache.accumulo.core.tabletserver.thrift.ActionStats;
+
+/**
+ *
+ */
+public class ActionStatsUpdator {
+
+ public static void update(ActionStats summary, ActionStats td) {
+ summary.status += td.status;
+ summary.elapsed += td.elapsed;
+ summary.num += td.num;
+ summary.count += td.count;
+ summary.sumDev += td.sumDev;
+ summary.queueTime += td.queueTime;
+ summary.queueSumDev += td.queueSumDev;
+ summary.fail += td.fail;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java b/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
new file mode 100644
index 0000000..813d54c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+public class AddFilesWithMissingEntries {
+
+ static final Logger log = Logger.getLogger(AddFilesWithMissingEntries.class);
+
+ public static class Opts extends ClientOpts {
+ @Parameter(names = "-update", description = "Make changes to the " + MetadataTable.NAME + " table to include missing files")
+ boolean update = false;
+ }
+
+ /**
+ * A utility to add files to the {@value MetadataTable#NAME} table that are not listed in the root tablet. This is a recovery tool for someone who knows what
+ * they are doing. It might be better to save off files, and recover your instance by re-initializing and importing the existing files.
+ */
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(AddFilesWithMissingEntries.class.getName(), args, bwOpts);
+
+ final Scanner scanner = opts.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.get(conf);
+
+ KeyExtent last = new KeyExtent();
+ String directory = null;
+ Set<String> knownFiles = new HashSet<String>();
+
+ int count = 0;
+ final MultiTableBatchWriter writer = opts.getConnector().createMultiTableBatchWriter(bwOpts.getBatchWriterConfig());
+
+ // collect the list of known files and the directory for each extent
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ KeyExtent ke = new KeyExtent(key.getRow(), (Text) null);
+ // when the key extent changes
+ if (!ke.equals(last)) {
+ if (directory != null) {
+ // add any files in the directory unknown to the key extent
+ count += addUnknownFiles(fs, directory, knownFiles, last, writer, opts.update);
+ }
+ directory = null;
+ knownFiles.clear();
+ last = ke;
+ }
+ if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+ directory = entry.getValue().toString();
+ log.debug("Found directory " + directory + " for row " + key.getRow().toString());
+ } else if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
+ String filename = key.getColumnQualifier().toString();
+ knownFiles.add(filename);
+ log.debug("METADATA file found: " + filename);
+ }
+ }
+ if (directory != null) {
+ // catch the last key extent
+ count += addUnknownFiles(fs, directory, knownFiles, last, writer, opts.update);
+ }
+ log.info("There were " + count + " files that are unknown to the metadata table");
+ writer.close();
+ }
+
+ private static int addUnknownFiles(FileSystem fs, String directory, Set<String> knownFiles, KeyExtent ke, MultiTableBatchWriter writer, boolean update)
+ throws Exception {
+ int count = 0;
+ final String tableId = ke.getTableId().toString();
+ final Text row = ke.getMetadataEntry();
+ log.info(row.toString());
+ for (String dir : ServerConstants.getTablesDirs()) {
+ final Path path = new Path(dir + "/" + tableId + directory);
+ for (FileStatus file : fs.listStatus(path)) {
+ if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf"))
+ continue;
+ final String filename = directory + "/" + file.getPath().getName();
+ if (!knownFiles.contains(filename)) {
+ count++;
+ final Mutation m = new Mutation(row);
+ String size = Long.toString(file.getLen());
+ String entries = "1"; // lie
+ String value = size + "," + entries;
+ m.put(DataFileColumnFamily.NAME, new Text(filename), new Value(value.getBytes()));
+ if (update) {
+ writer.getBatchWriter(MetadataTable.NAME).addMutation(m);
+ }
+ }
+ }
+ }
+ return count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
new file mode 100644
index 0000000..2f5576c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.impl.ClientExec;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+
+public class Admin {
+ private static final Logger log = Logger.getLogger(Admin.class);
+
+ static class AdminOpts extends ClientOpts {
+ @Parameter(names = {"-f", "--force"}, description = "force the given server to stop by removing its lock")
+ boolean force = false;
+ }
+
+ @Parameters(commandDescription = "stop the tablet server on the given hosts")
+ static class StopCommand {
+ @Parameter(description = "<host> {<host> ... }")
+ List<String> args = new ArrayList<String>();
+ }
+
+ @Parameters(commandDescription = "Ping tablet servers. If no arguments, pings all.")
+ static class PingCommand {
+ @Parameter(description = "{<host> ... }")
+ List<String> args = new ArrayList<String>();
+ }
+
+ @Parameters(commandDescription = "print tablets that are offline in online tables")
+ static class CheckTabletsCommand {
+ @Parameter(names = "--fixFiles", description = "Remove dangling file pointers")
+ boolean fixFiles = false;
+
+ @Parameter(names = {"-t", "--table"}, description = "Table to check, if not set checks all tables")
+ String table = null;
+ }
+
+ @Parameters(commandDescription = "stop the master")
+ static class StopMasterCommand {}
+
+ @Parameters(commandDescription = "stop all the servers")
+ static class StopAllCommand {}
+
+ @Parameters(commandDescription = "list Accumulo instances in zookeeper")
+ static class ListInstancesCommand {
+ @Parameter(names = "--print-errors", description = "display errors while listing instances")
+ boolean printErrors = false;
+ @Parameter(names = "--print-all", description = "print information for all instances, not just those with names")
+ boolean printAll = false;
+ }
+
+ @Parameters(commandDescription = "print out non-default configuration settings")
+ static class DumpConfigCommand {
+ @Parameter(names = {"-t", "--tables"}, description = "print per-table configuration")
+ List<String> tables = new ArrayList<String>();
+ @Parameter(names = {"-a", "--all"}, description = "print the system and all table configurations")
+ boolean allConfiguration = false;
+ @Parameter(names = {"-s", "--system"}, description = "print the system configuration")
+ boolean systemConfiguration = false;
+ @Parameter(names = {"-p", "--permissions"}, description = "print user permissions (must be used in combination with -a, -s, or -t)")
+ boolean userPermissions = false;
+ @Parameter(names = {"-d", "--directory"}, description = "directory to place config files")
+ String directory = null;
+ }
+
+ public static void main(String[] args) {
+ boolean everything;
+
+ AdminOpts opts = new AdminOpts();
+ JCommander cl = new JCommander(opts);
+ cl.setProgramName(Admin.class.getName());
+
+ CheckTabletsCommand checkTabletsCommand = new CheckTabletsCommand();
+ cl.addCommand("checkTablets", checkTabletsCommand);
+
+ ListInstancesCommand listIntancesOpts = new ListInstancesCommand();
+ cl.addCommand("listInstances", listIntancesOpts);
+
+ PingCommand pingCommand = new PingCommand();
+ cl.addCommand("ping", pingCommand);
+
+ DumpConfigCommand dumpConfigCommand = new DumpConfigCommand();
+ cl.addCommand("dumpConfig", dumpConfigCommand);
+
+ StopCommand stopOpts = new StopCommand();
+ cl.addCommand("stop", stopOpts);
+ StopAllCommand stopAllOpts = new StopAllCommand();
+ cl.addCommand("stopAll", stopAllOpts);
+ StopMasterCommand stopMasterOpts = new StopMasterCommand();
+ cl.addCommand("stopMaster", stopMasterOpts);
+ cl.parse(args);
+
+ if (opts.help || cl.getParsedCommand() == null) {
+ cl.usage();
+ return;
+ }
+ Instance instance = opts.getInstance();
+
+ try {
+ String principal;
+ AuthenticationToken token;
+ if (opts.getToken() == null) {
+ principal = SystemCredentials.get().getPrincipal();
+ token = SystemCredentials.get().getToken();
+ } else {
+ principal = opts.principal;
+ token = opts.getToken();
+ }
+
+ int rc = 0;
+
+ if (cl.getParsedCommand().equals("listInstances")) {
+ ListInstances.listInstances(instance.getZooKeepers(), listIntancesOpts.printAll, listIntancesOpts.printErrors);
+ } else if (cl.getParsedCommand().equals("ping")) {
+ if (ping(instance, principal, token, pingCommand.args) != 0)
+ rc = 4;
+ } else if (cl.getParsedCommand().equals("checkTablets")) {
+ System.out.println("\n*** Looking for offline tablets ***\n");
+ if (FindOfflineTablets.findOffline(instance, new Credentials(principal, token), checkTabletsCommand.table) != 0)
+ rc = 5;
+ System.out.println("\n*** Looking for missing files ***\n");
+ if (checkTabletsCommand.table == null) {
+ if (RemoveEntriesForMissingFiles.checkAllTables(instance, principal, token, checkTabletsCommand.fixFiles) != 0)
+ rc = 6;
+ } else {
+ if (RemoveEntriesForMissingFiles.checkTable(instance, principal, token, checkTabletsCommand.table, checkTabletsCommand.fixFiles) != 0)
+ rc = 6;
+ }
+
+ } else if (cl.getParsedCommand().equals("stop")) {
+ stopTabletServer(instance, new Credentials(principal, token), stopOpts.args, opts.force);
+ } else if (cl.getParsedCommand().equals("dumpConfig")) {
+ printConfig(instance, principal, token, dumpConfigCommand);
+ } else {
+ everything = cl.getParsedCommand().equals("stopAll");
+
+ if (everything)
+ flushAll(instance, principal, token);
+
+ stopServer(instance, new Credentials(principal, token), everything);
+ }
+
+ if (rc != 0)
+ System.exit(rc);
+ } catch (AccumuloException e) {
+ log.error(e, e);
+ System.exit(1);
+ } catch (AccumuloSecurityException e) {
+ log.error(e, e);
+ System.exit(2);
+ } catch (Exception e) {
+ log.error(e, e);
+ System.exit(3);
+ }
+ }
+
+ private static int ping(Instance instance, String principal, AuthenticationToken token, List<String> args) throws AccumuloException,
+ AccumuloSecurityException {
+
+ InstanceOperations io = instance.getConnector(principal, token).instanceOperations();
+
+ if (args.size() == 0) {
+ args = io.getTabletServers();
+ }
+
+ int unreachable = 0;
+
+ for (String tserver : args) {
+ try {
+ io.ping(tserver);
+ System.out.println(tserver + " OK");
+ } catch (AccumuloException ae) {
+ System.out.println(tserver + " FAILED (" + ae.getMessage() + ")");
+ unreachable++;
+ }
+ }
+
+ System.out.printf("\n%d of %d tablet servers unreachable\n\n", unreachable, args.size());
+ return unreachable;
+ }
+
+ /**
+ * flushing during shutdown is a performance optimization, its not required. The method will make an attempt to initiate flushes of all tables and give up if
+ * it takes too long.
+ *
+ */
+ private static void flushAll(final Instance instance, final String principal, final AuthenticationToken token) throws AccumuloException,
+ AccumuloSecurityException {
+
+ final AtomicInteger flushesStarted = new AtomicInteger(0);
+
+ Runnable flushTask = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connector conn = instance.getConnector(principal, token);
+ Set<String> tables = conn.tableOperations().tableIdMap().keySet();
+ for (String table : tables) {
+ if (table.equals(MetadataTable.NAME))
+ continue;
+ try {
+ conn.tableOperations().flush(table, null, null, false);
+ flushesStarted.incrementAndGet();
+ } catch (TableNotFoundException e) {}
+ }
+ } catch (Exception e) {
+ log.warn("Failed to intiate flush " + e.getMessage());
+ }
+ }
+ };
+
+ Thread flusher = new Thread(flushTask);
+ flusher.setDaemon(true);
+ flusher.start();
+
+ long start = System.currentTimeMillis();
+ try {
+ flusher.join(3000);
+ } catch (InterruptedException e) {}
+
+ while (flusher.isAlive() && System.currentTimeMillis() - start < 15000) {
+ int flushCount = flushesStarted.get();
+ try {
+ flusher.join(1000);
+ } catch (InterruptedException e) {}
+
+ if (flushCount == flushesStarted.get()) {
+ // no progress was made while waiting for join... maybe its stuck, stop waiting on it
+ break;
+ }
+ }
+ }
+
+ private static void stopServer(final Instance instance, final Credentials credentials, final boolean tabletServersToo) throws AccumuloException,
+ AccumuloSecurityException {
+ MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
+ @Override
+ public void execute(MasterClientService.Client client) throws Exception {
+ client.shutdown(Tracer.traceInfo(), credentials.toThrift(instance), tabletServersToo);
+ }
+ });
+ }
+
+ private static void stopTabletServer(final Instance instance, final Credentials creds, List<String> servers, final boolean force) throws AccumuloException,
+ AccumuloSecurityException {
+ for (String server : servers) {
+ HostAndPort address = AddressUtil.parseAddress(server);
+ final String finalServer = address.toString();
+ log.info("Stopping server " + finalServer);
+ MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
+ @Override
+ public void execute(MasterClientService.Client client) throws Exception {
+ client.shutdownTabletServer(Tracer.traceInfo(), creds.toThrift(instance), finalServer, force);
+ }
+ });
+ }
+ }
+
+ private static final String ACCUMULO_SITE_BACKUP_FILE = "accumulo-site.xml.bak";
+ private static final String PERMISSION_FILE_SUFFIX = "_perm.cfg";
+ private static final MessageFormat configFormat = new MessageFormat("config -t {0} -s {1}\n");
+ private static final MessageFormat sysPermFormat = new MessageFormat("grant System.{0} -s -u {1}\n");
+ private static final MessageFormat tablePermFormat = new MessageFormat("grant Table.{0} -t {1} -u {2}\n");
+
+ private static DefaultConfiguration defaultConfig;
+ private static Map<String,String> siteConfig, systemConfig;
+ private static List<String> localUsers;
+
+ public static void printConfig(Instance instance, String principal, AuthenticationToken token, DumpConfigCommand opts) throws Exception {
+
+ File outputDirectory = null;
+ if (opts.directory != null) {
+ outputDirectory = new File(opts.directory);
+ if (!outputDirectory.isDirectory()) {
+ throw new IllegalArgumentException(opts.directory + " does not exist on the local filesystem.");
+ }
+ if (!outputDirectory.canWrite()) {
+ throw new IllegalArgumentException(opts.directory + " is not writable");
+ }
+ }
+ Connector connector = instance.getConnector(principal, token);
+ defaultConfig = AccumuloConfiguration.getDefaultConfiguration();
+ siteConfig = connector.instanceOperations().getSiteConfiguration();
+ systemConfig = connector.instanceOperations().getSystemConfiguration();
+ if (opts.userPermissions) {
+ localUsers = Lists.newArrayList(connector.securityOperations().listLocalUsers());
+ Collections.sort(localUsers);
+ }
+ if (opts.allConfiguration) {
+ printSystemConfiguration(connector, outputDirectory, opts.userPermissions);
+ SortedSet<String> tableNames = connector.tableOperations().list();
+ for (String tableName : tableNames) {
+ printTableConfiguration(connector, tableName, outputDirectory, opts.userPermissions);
+ }
+
+ } else {
+ if (opts.systemConfiguration) {
+ printSystemConfiguration(connector, outputDirectory, opts.userPermissions);
+ }
+
+ for (String tableName : opts.tables) {
+ printTableConfiguration(connector, tableName, outputDirectory, opts.userPermissions);
+ }
+ }
+ }
+
+ private static String getDefaultConfigValue(String key) {
+ if (null == key)
+ return null;
+
+ String defaultValue = null;
+ try {
+ Property p = Property.getPropertyByKey(key);
+ if (null == p)
+ return defaultValue;
+ defaultValue = defaultConfig.get(p);
+ } catch (IllegalArgumentException e) {}
+ return defaultValue;
+ }
+
+ private static void printSystemConfiguration(Connector connector, File outputDirectory, boolean userPermissions) throws IOException, AccumuloException,
+ AccumuloSecurityException {
+ Configuration conf = new Configuration(false);
+ for (Entry<String,String> prop : siteConfig.entrySet()) {
+ String defaultValue = getDefaultConfigValue(prop.getKey());
+ if (!prop.getValue().equals(defaultValue) && !systemConfig.containsKey(prop.getKey())) {
+ conf.set(prop.getKey(), prop.getValue());
+ }
+ }
+ for (Entry<String,String> prop : systemConfig.entrySet()) {
+ String defaultValue = getDefaultConfigValue(prop.getKey());
+ if (!prop.getValue().equals(defaultValue)) {
+ conf.set(prop.getKey(), prop.getValue());
+ }
+ }
+ File siteBackup = new File(outputDirectory, ACCUMULO_SITE_BACKUP_FILE);
+ FileOutputStream fos = new FileOutputStream(siteBackup);
+ try {
+ conf.writeXml(fos);
+ } finally {
+ fos.close();
+ }
+ if (userPermissions) {
+ File permScript = new File(outputDirectory, "system" + PERMISSION_FILE_SUFFIX);
+ FileWriter writer = new FileWriter(permScript);
+ for (String principal : localUsers) {
+ for (SystemPermission perm : SystemPermission.values()) {
+ if (connector.securityOperations().hasSystemPermission(principal, perm)) {
+ writer.write(sysPermFormat.format(new String[] {perm.name(), principal}));
+ }
+ }
+ }
+ writer.close();
+ }
+ }
+
+ private static void printTableConfiguration(Connector connector, String tableName, File outputDirectory, boolean userPermissions) throws AccumuloException,
+ TableNotFoundException, IOException, AccumuloSecurityException {
+ Iterable<Entry<String,String>> tableConfig = connector.tableOperations().getProperties(tableName);
+ File tableBackup = new File(outputDirectory, tableName + ".cfg");
+ FileWriter writer = new FileWriter(tableBackup);
+ for (Entry<String,String> prop : tableConfig) {
+ if (prop.getKey().startsWith(Property.TABLE_PREFIX.getKey())) {
+ String defaultValue = getDefaultConfigValue(prop.getKey());
+ if (defaultValue == null || !defaultValue.equals(prop.getValue())) {
+ if (!prop.getValue().equals(siteConfig.get(prop.getKey())) && !prop.getValue().equals(systemConfig.get(prop.getKey()))) {
+ writer.write(configFormat.format(new String[] {tableName, prop.getKey() + "=" + prop.getValue()}));
+ }
+ }
+ }
+ }
+ writer.close();
+
+ if (userPermissions) {
+ File permScript = new File(outputDirectory, tableName + PERMISSION_FILE_SUFFIX);
+ FileWriter permWriter = new FileWriter(permScript);
+ for (String principal : localUsers) {
+ for (TablePermission perm : TablePermission.values()) {
+ if (connector.securityOperations().hasTablePermission(principal, tableName, perm)) {
+ permWriter.write(tablePermFormat.format(new String[] {perm.name(), tableName, principal}));
+ }
+ }
+ }
+ permWriter.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java b/server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java
new file mode 100644
index 0000000..0fd1c78
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import com.beust.jcommander.Parameter;
+
+public class ChangeSecret {
+
+ static class Opts extends ClientOpts {
+ @Parameter(names="--old", description="old zookeeper password", password=true, hidden=true)
+ String oldPass;
+ @Parameter(names="--new", description="new zookeeper password", password=true, hidden=true)
+ String newPass;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ List<String> argsList = new ArrayList<String>(args.length + 2);
+ argsList.add("--old");
+ argsList.add("--new");
+ argsList.addAll(Arrays.asList(args));
+ opts.parseArgs(ChangeSecret.class.getName(), argsList.toArray(new String[0]));
+ FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ Instance inst = opts.getInstance();
+ if (!verifyAccumuloIsDown(inst, opts.oldPass))
+ System.exit(-1);
+ String instanceId = rewriteZooKeeperInstance(inst, opts.oldPass, opts.newPass);
+ updateHdfs(fs, inst, instanceId);
+ if (opts.oldPass != null) {
+ deleteInstance(inst, opts.oldPass);
+ }
+ System.out.println("New instance id is " + instanceId);
+ System.out.println("Be sure to put your new secret in accumulo-site.xml");
+ }
+
+ interface Visitor {
+ void visit(ZooReader zoo, String path) throws Exception;
+ }
+
+ private static void recurse(ZooReader zoo, String root, Visitor v) {
+ try {
+ v.visit(zoo, root);
+ for (String child : zoo.getChildren(root)) {
+ recurse(zoo, root + "/" + child, v);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean verifyAccumuloIsDown(Instance inst, String oldPassword) {
+ ZooReader zooReader = new ZooReaderWriter(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), oldPassword);
+ String root = ZooUtil.getRoot(inst);
+ final List<String> ephemerals = new ArrayList<String>();
+ recurse(zooReader, root, new Visitor() {
+ public void visit(ZooReader zoo, String path) throws Exception {
+ Stat stat = zoo.getStatus(path);
+ if (stat.getEphemeralOwner() != 0)
+ ephemerals.add(path);
+ }
+ });
+ if (ephemerals.size() == 0) {
+ return true;
+ }
+
+ System.err.println("The following ephemeral nodes exist, something is still running:");
+ for (String path : ephemerals) {
+ System.err.println(path);
+ }
+ return false;
+ }
+
+ private static String rewriteZooKeeperInstance(final Instance inst, String oldPass, String newPass) throws Exception {
+ final ZooReaderWriter orig = new ZooReaderWriter(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), oldPass);
+ final IZooReaderWriter new_ = new ZooReaderWriter(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), newPass);
+ final String newInstanceId = UUID.randomUUID().toString();
+ String root = ZooUtil.getRoot(inst);
+ recurse(orig, root, new Visitor() {
+ public void visit(ZooReader zoo, String path) throws Exception {
+ String newPath = path.replace(inst.getInstanceID(), newInstanceId);
+ byte[] data = zoo.getData(path, null);
+ List<ACL> acls = orig.getZooKeeper().getACL(path, new Stat());
+ if (acls.containsAll(Ids.READ_ACL_UNSAFE)) {
+ new_.putPersistentData(newPath, data, NodeExistsPolicy.FAIL);
+ } else {
+ // upgrade
+ if (acls.containsAll(Ids.OPEN_ACL_UNSAFE)) {
+ // make user nodes private, they contain the user's password
+ String parts[] = path.split("/");
+ if (parts[parts.length - 2].equals("users")) {
+ new_.putPrivatePersistentData(newPath, data, NodeExistsPolicy.FAIL);
+ } else {
+ // everything else can have the readable acl
+ new_.putPersistentData(newPath, data, NodeExistsPolicy.FAIL);
+ }
+ } else {
+ new_.putPrivatePersistentData(newPath, data, NodeExistsPolicy.FAIL);
+ }
+ }
+ }
+ });
+ String path = "/accumulo/instances/" + inst.getInstanceName();
+ orig.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ new_.putPersistentData(path, newInstanceId.getBytes(), NodeExistsPolicy.OVERWRITE);
+ return newInstanceId;
+ }
+
+ private static void updateHdfs(FileSystem fs, Instance inst, String newInstanceId) throws IOException {
+ fs.delete(ServerConstants.getInstanceIdLocation(), true);
+ fs.mkdirs(ServerConstants.getInstanceIdLocation());
+ fs.create(new Path(ServerConstants.getInstanceIdLocation(), newInstanceId)).close();
+ }
+
+ private static void deleteInstance(Instance origInstance, String oldPass) throws Exception {
+ IZooReaderWriter orig = new ZooReaderWriter(origInstance.getZooKeepers(), origInstance.getZooKeepersSessionTimeOut(), oldPass);
+ orig.recursiveDelete("/accumulo/" + origInstance.getInstanceID(), NodeMissingPolicy.SKIP);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java b/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
new file mode 100644
index 0000000..df9900f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Writer;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+public class CheckForMetadataProblems {
+ private static boolean sawProblems = false;
+
+ public static void checkTable(String tablename, TreeSet<KeyExtent> tablets, Opts opts) throws AccumuloSecurityException {
+ // sanity check of metadata table entries
+ // make sure tablets has no holes, and that it starts and ends w/ null
+
+ if (tablets.size() == 0) {
+ System.out.println("No entries found in metadata table for table " + tablename);
+ sawProblems = true;
+ return;
+ }
+
+ if (tablets.first().getPrevEndRow() != null) {
+ System.out.println("First entry for table " + tablename + "- " + tablets.first() + " - has non null prev end row");
+ sawProblems = true;
+ return;
+ }
+
+ if (tablets.last().getEndRow() != null) {
+ System.out.println("Last entry for table " + tablename + "- " + tablets.last() + " - has non null end row");
+ sawProblems = true;
+ return;
+ }
+
+ Iterator<KeyExtent> tabIter = tablets.iterator();
+ Text lastEndRow = tabIter.next().getEndRow();
+ boolean everythingLooksGood = true;
+ while (tabIter.hasNext()) {
+ KeyExtent tabke = tabIter.next();
+ boolean broke = false;
+ if (tabke.getPrevEndRow() == null) {
+ System.out.println("Table " + tablename + " has null prev end row in middle of table " + tabke);
+ broke = true;
+ } else if (!tabke.getPrevEndRow().equals(lastEndRow)) {
+ System.out.println("Table " + tablename + " has a hole " + tabke.getPrevEndRow() + " != " + lastEndRow);
+ broke = true;
+ }
+ if (broke) {
+ everythingLooksGood = false;
+ }
+ if (broke && opts.fix) {
+ KeyExtent ke = new KeyExtent(tabke);
+ ke.setPrevEndRow(lastEndRow);
+ MetadataTableUtil.updateTabletPrevEndRow(ke, new Credentials(opts.principal, opts.getToken()));
+ System.out.println("KE " + tabke + " has been repaired to " + ke);
+ }
+
+ lastEndRow = tabke.getEndRow();
+ }
+ if (everythingLooksGood)
+ System.out.println("All is well for table " + tablename);
+ else
+ sawProblems = true;
+ }
+
+ public static void checkMetadataTableEntries(Opts opts, VolumeManager fs) throws Exception {
+ Map<String,TreeSet<KeyExtent>> tables = new HashMap<String,TreeSet<KeyExtent>>();
+
+ Scanner scanner;
+
+ if (opts.offline) {
+ scanner = new OfflineMetadataScanner(ServerConfiguration.getSystemConfiguration(opts.getInstance()), fs);
+ } else {
+ scanner = opts.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ }
+
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+
+ Text colf = new Text();
+ Text colq = new Text();
+ boolean justLoc = false;
+
+ int count = 0;
+
+ for (Entry<Key,Value> entry : scanner) {
+ colf = entry.getKey().getColumnFamily(colf);
+ colq = entry.getKey().getColumnQualifier(colq);
+
+ count++;
+
+ String tableName = (new KeyExtent(entry.getKey().getRow(), (Text) null)).getTableId().toString();
+
+ TreeSet<KeyExtent> tablets = tables.get(tableName);
+ if (tablets == null) {
+ Set<Entry<String,TreeSet<KeyExtent>>> es = tables.entrySet();
+
+ for (Entry<String,TreeSet<KeyExtent>> entry2 : es) {
+ checkTable(entry2.getKey(), entry2.getValue(), opts);
+ }
+
+ tables.clear();
+
+ tablets = new TreeSet<KeyExtent>();
+ tables.put(tableName, tablets);
+ }
+
+ if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(colf, colq)) {
+ KeyExtent tabletKe = new KeyExtent(entry.getKey().getRow(), entry.getValue());
+ tablets.add(tabletKe);
+ justLoc = false;
+ } else if (colf.equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
+ if (justLoc) {
+ System.out.println("Problem at key " + entry.getKey());
+ sawProblems = true;
+ if (opts.fix) {
+ Writer t = MetadataTableUtil.getMetadataTable(new Credentials(opts.principal, opts.getToken()));
+ Key k = entry.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.putDelete(k.getColumnFamily(), k.getColumnQualifier());
+ try {
+ t.update(m);
+ System.out.println("Deleted " + k);
+ } catch (ConstraintViolationException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ justLoc = true;
+ }
+ }
+
+ if (count == 0) {
+ System.err.println("ERROR : " + MetadataTable.NAME + " table is empty");
+ sawProblems = true;
+ }
+
+ Set<Entry<String,TreeSet<KeyExtent>>> es = tables.entrySet();
+
+ for (Entry<String,TreeSet<KeyExtent>> entry : es) {
+ checkTable(entry.getKey(), entry.getValue(), opts);
+ }
+
+ // end METADATA table sanity check
+ }
+
+ static class Opts extends ClientOpts {
+ @Parameter(names = "--fix", description = "best-effort attempt to fix problems found")
+ boolean fix = false;
+
+ @Parameter(names = "--offline", description = "perform the check on the files directly")
+ boolean offline = false;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(CheckForMetadataProblems.class.getName(), args);
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ checkMetadataTableEntries(opts, fs);
+ opts.stopTracing();
+ if (sawProblems)
+ throw new RuntimeException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java b/server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java
new file mode 100644
index 0000000..b7a90d3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+import com.beust.jcommander.Parameter;
+
+public class CleanZookeeper {
+
+ private static final Logger log = Logger.getLogger(CleanZookeeper.class);
+
+ static class Opts extends Help {
+ @Parameter(names={"-z", "--keepers"}, description="comma separated list of zookeeper hosts")
+ String keepers = "localhost:2181";
+ @Parameter(names={"--password"}, description="the system secret", password=true)
+ String auth;
+ }
+
+ /**
+ * @param args
+ * must contain one element: the address of a zookeeper node a second parameter provides an additional authentication value
+ * @throws IOException
+ * error connecting to accumulo or zookeeper
+ */
+ public static void main(String[] args) throws IOException {
+ Opts opts = new Opts();
+ opts.parseArgs(CleanZookeeper.class.getName(), args);
+
+ String root = Constants.ZROOT;
+ IZooReaderWriter zk = ZooReaderWriter.getInstance();
+ if (opts.auth != null) {
+ zk.getZooKeeper().addAuthInfo("digest", ("accumulo:"+opts.auth).getBytes());
+ }
+
+ try {
+ for (String child : zk.getChildren(root)) {
+ if (Constants.ZINSTANCES.equals("/" + child)) {
+ for (String instanceName : zk.getChildren(root + Constants.ZINSTANCES)) {
+ String instanceNamePath = root + Constants.ZINSTANCES + "/" + instanceName;
+ byte[] id = zk.getData(instanceNamePath, null);
+ if (id != null && !new String(id).equals(HdfsZooInstance.getInstance().getInstanceID())) {
+ try {
+ zk.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP);
+ } catch (KeeperException.NoAuthException ex) {
+ log.warn("Unable to delete " + instanceNamePath);
+ }
+ }
+ }
+ } else if (!child.equals(HdfsZooInstance.getInstance().getInstanceID())) {
+ String path = root + "/" + child;
+ try {
+ zk.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ } catch (KeeperException.NoAuthException ex) {
+ log.warn("Unable to delete " + path);
+ }
+ }
+ }
+ } catch (Exception ex) {
+ System.out.println("Error Occurred: " + ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/DefaultMap.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/DefaultMap.java b/server/base/src/main/java/org/apache/accumulo/server/util/DefaultMap.java
new file mode 100644
index 0000000..7038be7
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/DefaultMap.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashMap;
+
+/**
+ * A HashMap that returns a default value if the key is not stored in the map.
+ *
+ * A zero-argument constructor of the default object's class is used, otherwise the default object is used.
+ */
+public class DefaultMap<K,V> extends HashMap<K,V> {
+ private static final long serialVersionUID = 1L;
+ V dfault;
+
+ public DefaultMap(V dfault) {
+ this.dfault = dfault;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public V get(Object key) {
+ V result = super.get(key);
+ if (result == null) {
+ try {
+ super.put((K) key, result = construct());
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private V construct() {
+ try {
+ return (V) dfault.getClass().newInstance();
+ } catch (Exception ex) {
+ return dfault;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java b/server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java
new file mode 100644
index 0000000..a74f2b5
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+
+import com.beust.jcommander.Parameter;
+
+public class DeleteZooInstance {
+
+ static class Opts extends Help {
+ @Parameter(names={"-i", "--instance"}, description="the instance name or id to delete")
+ String instance;
+ }
+
+ static void deleteRetry(IZooReaderWriter zk, String path) throws Exception {
+ for (int i = 0; i < 10; i++){
+ try {
+ zk.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ return;
+ } catch (KeeperException.NotEmptyException ex) {
+ // ignored
+ } catch (Exception ex) {
+ throw ex;
+ }
+ }
+ }
+
+ /**
+ * @param args
+ * : the name or UUID of the instance to be deleted
+ */
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(DeleteZooInstance.class.getName(), args);
+
+ IZooReaderWriter zk = ZooReaderWriter.getInstance();
+ // try instance name:
+ Set<String> instances = new HashSet<String>(zk.getChildren(Constants.ZROOT + Constants.ZINSTANCES));
+ Set<String> uuids = new HashSet<String>(zk.getChildren(Constants.ZROOT));
+ uuids.remove("instances");
+ if (instances.contains(opts.instance)) {
+ String path = Constants.ZROOT + Constants.ZINSTANCES + "/" + opts.instance;
+ byte[] data = zk.getData(path, null);
+ deleteRetry(zk, path);
+ deleteRetry(zk, Constants.ZROOT + "/" + new String(data));
+ } else if (uuids.contains(opts.instance)) {
+ // look for the real instance name
+ for (String instance : instances) {
+ String path = Constants.ZROOT + Constants.ZINSTANCES + "/" + instance;
+ byte[] data = zk.getData(path, null);
+ if (opts.instance.equals(new String(data)))
+ deleteRetry(zk, path);
+ }
+ deleteRetry(zk, Constants.ZROOT + "/" + opts.instance);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java b/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java
new file mode 100644
index 0000000..c688869
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import com.beust.jcommander.Parameter;
+
+public class DumpZookeeper {
+
+ static IZooReaderWriter zk = null;
+
+ private static final Logger log = Logger.getLogger(DumpZookeeper.class);
+
+ private static class Encoded {
+ public String encoding;
+ public String value;
+
+ Encoded(String e, String v) {
+ encoding = e;
+ value = v;
+ }
+ }
+
+ static class Opts extends Help {
+ @Parameter(names = "--root", description = "the root of the znode tree to dump")
+ String root = "/";
+ }
+
+ public static void main(String[] args) {
+ Opts opts = new Opts();
+ opts.parseArgs(DumpZookeeper.class.getName(), args);
+
+ Logger.getRootLogger().setLevel(Level.WARN);
+ PrintStream out = System.out;
+ if (args.length > 0)
+ opts.root = opts.root;
+ try {
+ zk = ZooReaderWriter.getInstance();
+
+ write(out, 0, "<dump root='%s'>", opts.root);
+ for (String child : zk.getChildren(opts.root, null))
+ if (!child.equals("zookeeper"))
+ dump(out, opts.root, child, 1);
+ write(out, 0, "</dump>");
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+
+ private static void dump(PrintStream out, String root, String child, int indent) throws KeeperException, InterruptedException, UnsupportedEncodingException {
+ String path = root + "/" + child;
+ if (root.endsWith("/"))
+ path = root + child;
+ Stat stat = zk.getStatus(path);
+ if (stat == null)
+ return;
+ String type = "node";
+ if (stat.getEphemeralOwner() != 0) {
+ type = "ephemeral";
+ }
+ if (stat.getNumChildren() == 0) {
+ if (stat.getDataLength() == 0) {
+ write(out, indent, "<%s name='%s'/>", type, child);
+ } else {
+ Encoded value = value(path);
+ write(out, indent, "<%s name='%s' encoding='%s' value='%s'/>", type, child, value.encoding, value.value);
+ }
+ } else {
+ if (stat.getDataLength() == 0) {
+ write(out, indent, "<%s name='%s'>", type, child);
+ } else {
+ Encoded value = value(path);
+ write(out, indent, "<%s name='%s' encoding='%s' value='%s'>", type, child, value.encoding, value.value);
+ }
+ for (String c : zk.getChildren(path, null)) {
+ dump(out, path, c, indent + 1);
+ }
+ write(out, indent, "</node>");
+ }
+ }
+
+ private static Encoded value(String path) throws KeeperException, InterruptedException, UnsupportedEncodingException {
+ byte[] data = zk.getData(path, null);
+ for (int i = 0; i < data.length; i++) {
+ // does this look like simple ascii?
+ if (data[i] < ' ' || data[i] > '~')
+ return new Encoded("base64", new String(Base64.encodeBase64(data), Constants.UTF8));
+ }
+ return new Encoded(Constants.UTF8.name(), new String(data, Constants.UTF8));
+ }
+
+ private static void write(PrintStream out, int indent, String fmt, Object... args) {
+ for (int i = 0; i < indent; i++)
+ out.print(" ");
+ out.println(String.format(fmt, args));
+ }
+}