You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:29 UTC
[50/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
new file mode 100644
index 0000000..9e0ac39
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class ServerConstants {
+
+ // versions should never be negative
+ public static final Integer WIRE_VERSION = 2;
+
+ /**
+ * current version reflects the addition of a separate root table (ACCUMULO-1481)
+ */
+ public static final int DATA_VERSION = 6;
+ public static final int PREV_DATA_VERSION = 5;
+
+ private static String[] baseDirs = null;
+ private static String defaultBaseDir = null;
+
+ public static synchronized String getDefaultBaseDir() {
+ if (defaultBaseDir == null) {
+ String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+ String dfsUri = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI);
+ String baseDir;
+
+ if (dfsUri == null || dfsUri.isEmpty()) {
+ Configuration hadoopConfig = CachedConfiguration.getInstance();
+ try {
+ baseDir = FileSystem.get(hadoopConfig).getUri().toString() + singleNamespace;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (!dfsUri.contains(":"))
+ throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_DFS_URI.getKey() + " got " + dfsUri);
+ baseDir = dfsUri + singleNamespace;
+ }
+
+ defaultBaseDir = new Path(baseDir).toString();
+
+ }
+
+ return defaultBaseDir;
+ }
+
+ // these are functions to delay loading the Accumulo configuration unless we must
+ public static synchronized String[] getBaseDirs() {
+ if (baseDirs == null) {
+ String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+ String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
+
+ if (ns == null || ns.isEmpty()) {
+ baseDirs = new String[] {getDefaultBaseDir()};
+ } else {
+ String namespaces[] = ns.split(",");
+ for (String namespace : namespaces) {
+ if (!namespace.contains(":")) {
+ throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace);
+ }
+ }
+ baseDirs = prefix(namespaces, singleNamespace);
+ }
+ }
+
+ return baseDirs;
+ }
+
+ public static String[] prefix(String bases[], String suffix) {
+ if (suffix.startsWith("/"))
+ suffix = suffix.substring(1);
+ String result[] = new String[bases.length];
+ for (int i = 0; i < bases.length; i++) {
+ result[i] = bases[i] + "/" + suffix;
+ }
+ return result;
+ }
+
+ public static final String TABLE_DIR = "tables";
+ public static final String RECOVERY_DIR = "recovery";
+ public static final String WAL_DIR = "wal";
+
+ public static String[] getTablesDirs() {
+ return prefix(getBaseDirs(), TABLE_DIR);
+ }
+
+ public static String[] getRecoveryDirs() {
+ return prefix(getBaseDirs(), RECOVERY_DIR);
+ }
+
+ public static String[] getWalDirs() {
+ return prefix(getBaseDirs(), WAL_DIR);
+ }
+
+ public static String[] getWalogArchives() {
+ return prefix(getBaseDirs(), "walogArchive");
+ }
+
+ public static Path getInstanceIdLocation() {
+ return new Path(getBaseDirs()[0], "instance_id");
+ }
+
+ public static Path getDataVersionLocation() {
+ return new Path(getBaseDirs()[0], "version");
+ }
+
+ public static String[] getRootTableDirs() {
+ return prefix(getTablesDirs(), RootTable.ID);
+ }
+
+ public static String[] getMetadataTableDirs() {
+ return prefix(getTablesDirs(), MetadataTable.ID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java b/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
new file mode 100644
index 0000000..95fee8f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import org.apache.accumulo.core.cli.Help;
+
+import com.beust.jcommander.Parameter;
+
+public class ServerOpts extends Help {
+ @Parameter(names={"-a", "--address"}, description = "address to bind to")
+ String address = null;
+
+ public String getAddress() {
+ if (address != null)
+ return address;
+ return "0.0.0.0";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
new file mode 100644
index 0000000..53f5ac2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.cli;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ClientOnDefaultTable extends org.apache.accumulo.core.cli.ClientOnDefaultTable {
+ {
+ principal = "root";
+ }
+
+ @Override
+ synchronized public Instance getInstance() {
+ if (cachedInstance != null)
+ return cachedInstance;
+
+ if (mock)
+ return cachedInstance = new MockInstance(instance);
+ if (instance == null) {
+ return cachedInstance = HdfsZooInstance.getInstance();
+ }
+ return cachedInstance = new ZooKeeperInstance(this.instance, this.zookeepers);
+ }
+ public ClientOnDefaultTable(String table) {
+ super(table);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
new file mode 100644
index 0000000..e9e9bf1
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.cli;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ClientOnRequiredTable extends org.apache.accumulo.core.cli.ClientOnRequiredTable {
+ {
+ principal = "root";
+ }
+
+ @Override
+ synchronized public Instance getInstance() {
+ if (cachedInstance != null)
+ return cachedInstance;
+
+ if (mock)
+ return cachedInstance = new MockInstance(instance);
+ if (instance == null) {
+ return cachedInstance = HdfsZooInstance.getInstance();
+ }
+ return cachedInstance = new ZooKeeperInstance(this.instance, this.zookeepers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
new file mode 100644
index 0000000..6f3516a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.cli;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ClientOpts extends org.apache.accumulo.core.cli.ClientOpts {
+
+ {
+ principal = "root";
+ }
+
+ @Override
+ public Instance getInstance() {
+ if (mock)
+ return new MockInstance(instance);
+ if (instance == null) {
+ return HdfsZooInstance.getInstance();
+ }
+ return new ZooKeeperInstance(this.instance, this.zookeepers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
new file mode 100644
index 0000000..606941d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -0,0 +1,776 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.StopWatch;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.trace.instrument.TraceRunnable;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TServiceClient;
+
+public class BulkImporter {
+
+ private static final Logger log = Logger.getLogger(BulkImporter.class);
+
+ public static List<String> bulkLoad(AccumuloConfiguration conf, Instance instance, Credentials creds, long tid, String tableId, List<String> files,
+ String errorDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, ThriftTableOperationException {
+ AssignmentStats stats = new BulkImporter(conf, instance, creds, tid, tableId, setTime).importFiles(files, new Path(errorDir));
+ List<String> result = new ArrayList<String>();
+ for (Path p : stats.completeFailures.keySet()) {
+ result.add(p.toString());
+ }
+ return result;
+ }
+
+ private StopWatch<Timers> timer;
+
+ private static enum Timers {
+ EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
+ }
+
+ private Instance instance;
+ private Credentials credentials;
+ private String tableId;
+ private long tid;
+ private AccumuloConfiguration acuConf;
+ private boolean setTime;
+
+ public BulkImporter(AccumuloConfiguration conf, Instance instance, Credentials credentials, long tid, String tableId, boolean setTime) {
+ this.instance = instance;
+ this.credentials = credentials;
+ this.tid = tid;
+ this.tableId = tableId;
+ this.acuConf = conf;
+ this.setTime = setTime;
+ }
+
+ public AssignmentStats importFiles(List<String> files, Path failureDir) throws IOException, AccumuloException, AccumuloSecurityException,
+ ThriftTableOperationException {
+
+ int numThreads = acuConf.getCount(Property.TSERV_BULK_PROCESS_THREADS);
+ int numAssignThreads = acuConf.getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
+
+ timer = new StopWatch<Timers>(Timers.class);
+ timer.start(Timers.TOTAL);
+
+ Configuration conf = CachedConfiguration.getInstance();
+ VolumeManagerImpl.get(acuConf);
+ final VolumeManager fs = VolumeManagerImpl.get(acuConf);
+
+ Set<Path> paths = new HashSet<Path>();
+ for (String file : files) {
+ paths.add(new Path(file));
+ }
+ AssignmentStats assignmentStats = new AssignmentStats(paths.size());
+
+ final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ ClientService.Client client = null;
+ final TabletLocator locator = TabletLocator.getLocator(instance, new Text(tableId));
+
+ try {
+ final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
+
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
+
+ for (Path path : paths) {
+ final Path mapFile = path;
+ Runnable getAssignments = new Runnable() {
+ @Override
+ public void run() {
+ List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
+ try {
+ tabletsToAssignMapFileTo = findOverlappingTablets(instance.getConfiguration(), fs, locator, mapFile, credentials);
+ } catch (Exception ex) {
+ log.warn("Unable to find tablets that overlap file " + mapFile.toString());
+ }
+ log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets");
+ if (tabletsToAssignMapFileTo.size() == 0) {
+ List<KeyExtent> empty = Collections.emptyList();
+ completeFailures.put(mapFile, empty);
+ } else
+ assignments.put(mapFile, tabletsToAssignMapFileTo);
+ }
+ };
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, getAssignments)));
+ }
+ threadPool.shutdown();
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures);
+
+ Map<Path,Integer> failureCount = new TreeMap<Path,Integer>();
+
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
+ failureCount.put(entry.getKey(), 1);
+
+ long sleepTime = 2 * 1000;
+ while (assignmentFailures.size() > 0) {
+ sleepTime = Math.min(sleepTime * 2, 60 * 1000);
+ locator.invalidateCache();
+ // assumption about assignment failures is that it caused by a split
+ // happening or a missing location
+ //
+ // for splits we need to find children key extents that cover the
+ // same key range and are contiguous (no holes, no overlap)
+
+ timer.start(Timers.SLEEP);
+ UtilWaitThread.sleep(sleepTime);
+ timer.stop(Timers.SLEEP);
+
+ log.debug("Trying to assign " + assignmentFailures.size() + " map files that previously failed on some key extents");
+ assignments.clear();
+
+ // for failed key extents, try to find children key extents to
+ // assign to
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ Iterator<KeyExtent> keListIter = entry.getValue().iterator();
+
+ List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<TabletLocation>();
+
+ while (keListIter.hasNext()) {
+ KeyExtent ke = keListIter.next();
+
+ try {
+ timer.start(Timers.QUERY_METADATA);
+ tabletsToAssignMapFileTo.addAll(findOverlappingTablets(instance.getConfiguration(), fs, locator, entry.getKey(), ke, credentials));
+ timer.stop(Timers.QUERY_METADATA);
+ keListIter.remove();
+ } catch (Exception ex) {
+ log.warn("Exception finding overlapping tablets, will retry tablet " + ke);
+ }
+ }
+
+ if (tabletsToAssignMapFileTo.size() > 0)
+ assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
+ }
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures2 = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures2);
+
+ // merge assignmentFailures2 into assignmentFailures
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
+ assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
+
+ Integer fc = failureCount.get(entry.getKey());
+ if (fc == null)
+ fc = 0;
+
+ failureCount.put(entry.getKey(), fc + 1);
+ }
+
+ // remove map files that have no more key extents to assign
+ Iterator<Entry<Path,List<KeyExtent>>> afIter = assignmentFailures.entrySet().iterator();
+ while (afIter.hasNext()) {
+ Entry<Path,List<KeyExtent>> entry = afIter.next();
+ if (entry.getValue().size() == 0)
+ afIter.remove();
+ }
+
+ Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
+ for (Entry<Path,Integer> entry : failureIter) {
+ int retries = acuConf.getCount(Property.TSERV_BULK_RETRY);
+ if (entry.getValue() > retries && assignmentFailures.get(entry.getKey()) != null) {
+ log.error("Map file " + entry.getKey() + " failed more than " + retries + " times, giving up.");
+ completeFailures.put(entry.getKey(), assignmentFailures.get(entry.getKey()));
+ assignmentFailures.remove(entry.getKey());
+ }
+ }
+ }
+ assignmentStats.assignmentsAbandoned(completeFailures);
+ Set<Path> failedFailures = processFailures(completeFailures);
+ assignmentStats.unrecoveredMapFiles(failedFailures);
+
+ timer.stop(Timers.TOTAL);
+ printReport();
+ return assignmentStats;
+ } finally {
+ if (client != null)
+ ServerClient.close(client);
+ locator.invalidateCache();
+ }
+ }
+
+ private void printReport() {
+ long totalTime = 0;
+ for (Timers t : Timers.values()) {
+ if (t == Timers.TOTAL)
+ continue;
+
+ totalTime += timer.get(t);
+ }
+
+ log.debug("BULK IMPORT TIMING STATISTICS");
+ log.debug(String.format("Examine map files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.EXAMINE_MAP_FILES), 100.0 * timer.get(Timers.EXAMINE_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Query %-14s : %,10.2f secs %6.2f%s", MetadataTable.NAME, timer.getSecs(Timers.QUERY_METADATA),
+ 100.0 * timer.get(Timers.QUERY_METADATA) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Import Map Files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.IMPORT_MAP_FILES), 100.0 * timer.get(Timers.IMPORT_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Sleep : %,10.2f secs %6.2f%s", timer.getSecs(Timers.SLEEP),
+ 100.0 * timer.get(Timers.SLEEP) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Misc : %,10.2f secs %6.2f%s", (timer.get(Timers.TOTAL) - totalTime) / 1000.0, 100.0
+ * (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Total : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
+ }
+
+ private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) {
+ // we should check if map file was not assigned to any tablets, then we
+ // should just move it; not currently being done?
+
+ Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
+
+ if (completeFailures.size() == 0)
+ return Collections.emptySet();
+
+ log.debug("The following map files failed ");
+
+ for (Entry<Path,List<KeyExtent>> entry : es) {
+ List<KeyExtent> extents = entry.getValue();
+
+ for (KeyExtent keyExtent : extents)
+ log.debug("\t" + entry.getKey() + " -> " + keyExtent);
+ }
+
+ return Collections.emptySet();
+ }
+
+ private class AssignmentInfo {
+ public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
+ this.ke = keyExtent;
+ this.estSize = estSize;
+ }
+
+ KeyExtent ke;
+ long estSize;
+ }
+
+ private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
+ List<KeyExtent> result = new ArrayList<KeyExtent>(locations.size());
+ for (TabletLocation tl : locations)
+ result.add(tl.tablet_extent);
+ return result;
+ }
+
+ private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final VolumeManager vm,
+ Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
+
+ long t1 = System.currentTimeMillis();
+ final Map<Path,Long> mapFileSizes = new TreeMap<Path,Long>();
+
+ try {
+ for (Path path : paths) {
+ FileSystem fs = vm.getFileSystemByPath(path);
+ mapFileSizes.put(path, fs.getContentSummary(path).getLength());
+ }
+ } catch (IOException e) {
+ log.error("Failed to get map files in for " + paths + ": " + e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+
+ final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
+
+ for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ if (entry.getValue().size() == 1) {
+ TabletLocation tabletLocation = entry.getValue().get(0);
+
+ // if the tablet completely contains the map file, there is no
+ // need to estimate its
+ // size
+ ais.put(entry.getKey(), Collections.singletonList(new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
+ continue;
+ }
+
+ Runnable estimationTask = new Runnable() {
+ @Override
+ public void run() {
+ Map<KeyExtent,Long> estimatedSizes = null;
+
+ try {
+ FileSystem fs = vm.getFileSystemByPath(entry.getKey());
+ estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, fs);
+ } catch (IOException e) {
+ log.warn("Failed to estimate map file sizes " + e.getMessage());
+ }
+
+ if (estimatedSizes == null) {
+ // estimation failed, do a simple estimation
+ estimatedSizes = new TreeMap<KeyExtent,Long>();
+ long estSize = (long) (mapFileSizes.get(entry.getKey()) / (double) entry.getValue().size());
+ for (TabletLocation tl : entry.getValue())
+ estimatedSizes.put(tl.tablet_extent, estSize);
+ }
+
+ List<AssignmentInfo> assignmentInfoList = new ArrayList<AssignmentInfo>(estimatedSizes.size());
+
+ for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
+ assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
+
+ ais.put(entry.getKey(), assignmentInfoList);
+ }
+ };
+
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
+
+ return ais;
+ }
+
+ private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
+ Map<KeyExtent,String> result = new HashMap<KeyExtent,String>();
+ for (List<TabletLocation> entry : assignments.values()) {
+ for (TabletLocation tl : entry) {
+ result.put(tl.tablet_extent, tl.tablet_location);
+ }
+ }
+ return result;
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(AccumuloConfiguration acuConf, Instance instance, Configuration conf, Credentials credentials,
+ VolumeManager fs, String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) {
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(acuConf, conf, fs, assignments, paths, numMapThreads);
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ Map<Path,List<KeyExtent>> ret;
+
+ timer.start(Timers.IMPORT_MAP_FILES);
+ ret = assignMapFiles(credentials, tableId, assignInfo, locationsOf(assignments), numThreads);
+ timer.stop(Timers.IMPORT_MAP_FILES);
+
+ return ret;
+ }
+
+ private class AssignmentTask implements Runnable {
+ final Map<Path,List<KeyExtent>> assignmentFailures;
+ String location;
+ Credentials credentials;
+ private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
+
+ public AssignmentTask(Credentials credentials, Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location,
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
+ this.assignmentFailures = assignmentFailures;
+ this.location = location;
+ this.assignmentsPerTablet = assignmentsPerTablet;
+ this.credentials = credentials;
+ }
+
+ private void handleFailures(Collection<KeyExtent> failures, String message) {
+ for (KeyExtent ke : failures) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
+ synchronized (assignmentFailures) {
+ for (PathSize pathSize : mapFiles) {
+ List<KeyExtent> existingFailures = assignmentFailures.get(pathSize.path);
+ if (existingFailures == null) {
+ existingFailures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, existingFailures);
+ }
+
+ existingFailures.add(ke);
+ }
+ }
+
+ log.info("Could not assign " + mapFiles.size() + " map files to tablet " + ke + " because : " + message + ". Will retry ...");
+ }
+ }
+
+ @Override
+ public void run() {
+ HashSet<Path> uniqMapFiles = new HashSet<Path>();
+ for (List<PathSize> mapFiles : assignmentsPerTablet.values())
+ for (PathSize ps : mapFiles)
+ uniqMapFiles.add(ps.path);
+
+ log.debug("Assigning " + uniqMapFiles.size() + " map files to " + assignmentsPerTablet.size() + " tablets at " + location);
+
+ try {
+ List<KeyExtent> failures = assignMapFiles(credentials, location, assignmentsPerTablet);
+ handleFailures(failures, "Not Serving Tablet");
+ } catch (AccumuloException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ } catch (AccumuloSecurityException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ }
+ }
+
+ }
+
+ private class PathSize {
+ public PathSize(Path mapFile, long estSize) {
+ this.path = mapFile;
+ this.estSize = estSize;
+ }
+
+ Path path;
+ long estSize;
+
+ @Override
+ public String toString() {
+ return path + " " + estSize;
+ }
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(Credentials credentials, String tableName, Map<Path,List<AssignmentInfo>> assignments,
+ Map<KeyExtent,String> locations, int numThreads) {
+
+ // group assignments by tablet
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<KeyExtent,List<PathSize>>();
+ for (Entry<Path,List<AssignmentInfo>> entry : assignments.entrySet()) {
+ Path mapFile = entry.getKey();
+ List<AssignmentInfo> tabletsToAssignMapFileTo = entry.getValue();
+
+ for (AssignmentInfo ai : tabletsToAssignMapFileTo) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ai.ke);
+ if (mapFiles == null) {
+ mapFiles = new ArrayList<PathSize>();
+ assignmentsPerTablet.put(ai.ke, mapFiles);
+ }
+
+ mapFiles.add(new PathSize(mapFile, ai.estSize));
+ }
+ }
+
+ // group assignments by tabletserver
+
+ Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<String,Map<KeyExtent,List<PathSize>>>();
+
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ KeyExtent ke = entry.getKey();
+ String location = locations.get(ke);
+
+ if (location == null) {
+ for (PathSize pathSize : entry.getValue()) {
+ synchronized (assignmentFailures) {
+ List<KeyExtent> failures = assignmentFailures.get(pathSize.path);
+ if (failures == null) {
+ failures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, failures);
+ }
+
+ failures.add(ke);
+ }
+ }
+
+ log.warn("Could not assign " + entry.getValue().size() + " map files to tablet " + ke + " because it had no location, will retry ...");
+
+ continue;
+ }
+
+ Map<KeyExtent,List<PathSize>> apt = assignmentsPerTabletServer.get(location);
+ if (apt == null) {
+ apt = new TreeMap<KeyExtent,List<PathSize>>();
+ assignmentsPerTabletServer.put(location, apt);
+ }
+
+ apt.put(entry.getKey(), entry.getValue());
+ }
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
+
+ for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
+ String location = entry.getKey();
+ threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ return assignmentFailures;
+ }
+
+ private List<KeyExtent> assignMapFiles(Credentials credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet)
+ throws AccumuloException, AccumuloSecurityException {
+ try {
+ long timeInMillis = instance.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
+ TabletClientService.Iface client = ThriftUtil.getTServerClient(location, timeInMillis);
+ try {
+ HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo> tabletFiles = new HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo>();
+ files.put(entry.getKey(), tabletFiles);
+
+ for (PathSize pathSize : entry.getValue()) {
+ org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize);
+ tabletFiles.put(pathSize.path.toString(), mfi);
+ }
+ }
+
+ log.debug("Asking " + location + " to bulk load " + files);
+ List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), credentials.toThrift(instance), tid, Translator.translate(files, Translator.KET),
+ setTime);
+
+ return Translator.translate(failures, Translator.TKET);
+ } finally {
+ ThriftUtil.returnClient((TServiceClient) client);
+ }
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ throw new AccumuloException(t);
+ }
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ Credentials credentials) throws Exception {
+ return findOverlappingTablets(acuConf, fs, locator, file, null, null, credentials);
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ KeyExtent failed,
+ Credentials credentials) throws Exception {
+ locator.invalidateCache(failed);
+ Text start = failed.getPrevEndRow();
+ if (start != null)
+ start = Range.followingPrefix(start);
+ return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow(), credentials);
+ }
+
+ final static byte[] byte0 = {0};
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager vm, TabletLocator locator, Path file, Text startRow,
+ Text endRow, Credentials credentials) throws Exception {
+ List<TabletLocation> result = new ArrayList<TabletLocation>();
+ Collection<ByteSequence> columnFamilies = Collections.emptyList();
+ String filename = file.toString();
+ // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
+ FileSystem fs = vm.getFileSystemByPath(file);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf);
+ try {
+ Text row = startRow;
+ if (row == null)
+ row = new Text();
+ while (true) {
+ // log.debug(filename + " Seeking to row " + row);
+ reader.seek(new Range(row, null), columnFamilies, false);
+ if (!reader.hasTop()) {
+ // log.debug(filename + " not found");
+ break;
+ }
+ row = reader.getTopKey().getRow();
+ TabletLocation tabletLocation = locator.locateTablet(credentials, row, false, true);
+ // log.debug(filename + " found row " + row + " at location " + tabletLocation);
+ result.add(tabletLocation);
+ row = tabletLocation.tablet_extent.getEndRow();
+ if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
+ row = new Text(row);
+ row.append(byte0, 0, byte0.length);
+ } else
+ break;
+ }
+ } finally {
+ reader.close();
+ }
+ // log.debug(filename + " to be sent to " + result);
+ return result;
+ }
+
+ public static class AssignmentStats {
+ private Map<KeyExtent,Integer> counts;
+ private int numUniqueMapFiles;
+ private Map<Path,List<KeyExtent>> completeFailures = null;
+ private Set<Path> failedFailures = null;
+
+ AssignmentStats(int fileCount) {
+ counts = new HashMap<KeyExtent,Integer>();
+ numUniqueMapFiles = fileCount;
+ }
+
+ void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
+ for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ for (TabletLocation tl : entry.getValue()) {
+
+ Integer count = getCount(tl.tablet_extent);
+
+ counts.put(tl.tablet_extent, count + 1);
+ }
+ }
+ }
+
+ void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ for (KeyExtent ke : entry.getValue()) {
+
+ Integer count = getCount(ke);
+
+ counts.put(ke, count - 1);
+ }
+ }
+ }
+
+ void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
+ this.completeFailures = completeFailures;
+ }
+
+ void tabletSplit(KeyExtent parent, Collection<KeyExtent> children) {
+ Integer count = getCount(parent);
+
+ counts.remove(parent);
+
+ for (KeyExtent keyExtent : children)
+ counts.put(keyExtent, count);
+ }
+
+ private Integer getCount(KeyExtent parent) {
+ Integer count = counts.get(parent);
+
+ if (count == null) {
+ count = 0;
+ }
+ return count;
+ }
+
+ void unrecoveredMapFiles(Set<Path> failedFailures) {
+ this.failedFailures = failedFailures;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ int totalAssignments = 0;
+ int tabletsImportedTo = 0;
+
+ int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
+ totalAssignments += entry.getValue();
+ if (entry.getValue() > 0)
+ tabletsImportedTo++;
+
+ if (entry.getValue() < min)
+ min = entry.getValue();
+
+ if (entry.getValue() > max)
+ max = entry.getValue();
+ }
+
+ double stddev = 0;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet())
+ stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
+
+ stddev = stddev / counts.size();
+ stddev = Math.sqrt(stddev);
+
+ Set<KeyExtent> failedTablets = new HashSet<KeyExtent>();
+ for (List<KeyExtent> ft : completeFailures.values())
+ failedTablets.addAll(ft);
+
+ sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
+ sb.append(String.format("# of map files : %,10d%n", numUniqueMapFiles));
+ sb.append(String.format("# map files with failures : %,10d %6.2f%s%n", completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
+ sb.append(String.format("# failed failed map files : %,10d %s%n", failedFailures.size(), failedFailures.size() > 0 ? " <-- THIS IS BAD" : ""));
+ sb.append(String.format("# of tablets : %,10d%n", counts.size()));
+ sb.append(String.format("# tablets imported to : %,10d %6.2f%s%n", tabletsImportedTo, tabletsImportedTo * 100.0 / counts.size(), "%"));
+ sb.append(String.format("# tablets with failures : %,10d %6.2f%s%n", failedTablets.size(), failedTablets.size() * 100.0 / counts.size(), "%"));
+ sb.append(String.format("min map files per tablet : %,10d%n", min));
+ sb.append(String.format("max map files per tablet : %,10d%n", max));
+ sb.append(String.format("avg map files per tablet : %,10.2f (std dev = %.2f)%n", totalAssignments / (double) counts.size(), stddev));
+ return sb.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
new file mode 100644
index 0000000..3f1aaa2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ConfigurationType;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.TDiskUsage;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.util.TableDiskUsage;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+public class ClientServiceHandler implements ClientService.Iface {
+ private static final Logger log = Logger.getLogger(ClientServiceHandler.class);
+ private static SecurityOperation security = AuditedSecurityOperation.getInstance();
+ protected final TransactionWatcher transactionWatcher;
+ private final Instance instance;
+ private final VolumeManager fs;
+
+ public ClientServiceHandler(Instance instance, TransactionWatcher transactionWatcher, VolumeManager fs) {
+ this.instance = instance;
+ this.transactionWatcher = transactionWatcher;
+ this.fs = fs;
+ }
+
+ protected String checkTableId(String tableName, TableOperation operation) throws ThriftTableOperationException {
+ String tableId = Tables.getNameToIdMap(instance).get(tableName);
+ if (tableId == null) {
+ // maybe the table exist, but the cache was not updated yet... so try to clear the cache and check again
+ Tables.clearCache(instance);
+ tableId = Tables.getNameToIdMap(instance).get(tableName);
+ if (tableId == null)
+ throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.NOTFOUND, null);
+ }
+ return tableId;
+ }
+
+ @Override
+ public String getInstanceId() {
+ return instance.getInstanceID();
+ }
+
+ @Override
+ public String getRootTabletLocation() {
+ return instance.getRootTabletLocation();
+ }
+
+ @Override
+ public String getZooKeepers() {
+ return instance.getZooKeepers();
+ }
+
+ @Override
+ public void ping(TCredentials credentials) {
+ // anybody can call this; no authentication check
+ log.info("Master reports: I just got pinged!");
+ }
+
+ @Override
+ public boolean authenticate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+ try {
+ return security.authenticateUser(credentials, credentials);
+ } catch (ThriftSecurityException e) {
+ log.error(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean authenticateUser(TInfo tinfo, TCredentials credentials, TCredentials toAuth) throws ThriftSecurityException {
+ try {
+ return security.authenticateUser(credentials, toAuth);
+ } catch (ThriftSecurityException e) {
+ log.error(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void changeAuthorizations(TInfo tinfo, TCredentials credentials, String user, List<ByteBuffer> authorizations) throws ThriftSecurityException {
+ security.changeAuthorizations(credentials, user, new Authorizations(authorizations));
+ }
+
+ @Override
+ public void changeLocalUserPassword(TInfo tinfo, TCredentials credentials, String principal, ByteBuffer password) throws ThriftSecurityException {
+ PasswordToken token = new PasswordToken(password);
+ Credentials toChange = new Credentials(principal, token);
+ security.changePassword(credentials, toChange);
+ }
+
+ @Override
+ public void createLocalUser(TInfo tinfo, TCredentials credentials, String principal, ByteBuffer password) throws ThriftSecurityException {
+ PasswordToken token = new PasswordToken(password);
+ Credentials newUser = new Credentials(principal, token);
+ security.createUser(credentials, newUser, new Authorizations());
+ }
+
+ @Override
+ public void dropLocalUser(TInfo tinfo, TCredentials credentials, String user) throws ThriftSecurityException {
+ security.dropUser(credentials, user);
+ }
+
+ @Override
+ public List<ByteBuffer> getUserAuthorizations(TInfo tinfo, TCredentials credentials, String user) throws ThriftSecurityException {
+ return security.getUserAuthorizations(credentials, user).getAuthorizationsBB();
+ }
+
+ @Override
+ public void grantSystemPermission(TInfo tinfo, TCredentials credentials, String user, byte permission) throws ThriftSecurityException {
+ security.grantSystemPermission(credentials, user, SystemPermission.getPermissionById(permission));
+ }
+
+ @Override
+ public void grantTablePermission(TInfo tinfo, TCredentials credentials, String user, String tableName, byte permission) throws ThriftSecurityException,
+ ThriftTableOperationException {
+ String tableId = checkTableId(tableName, TableOperation.PERMISSION);
+ security.grantTablePermission(credentials, user, tableId, TablePermission.getPermissionById(permission));
+ }
+
+ @Override
+ public void revokeSystemPermission(TInfo tinfo, TCredentials credentials, String user, byte permission) throws ThriftSecurityException {
+ security.revokeSystemPermission(credentials, user, SystemPermission.getPermissionById(permission));
+ }
+
+ @Override
+ public void revokeTablePermission(TInfo tinfo, TCredentials credentials, String user, String tableName, byte permission) throws ThriftSecurityException,
+ ThriftTableOperationException {
+ String tableId = checkTableId(tableName, TableOperation.PERMISSION);
+ security.revokeTablePermission(credentials, user, tableId, TablePermission.getPermissionById(permission));
+ }
+
+ @Override
+ public boolean hasSystemPermission(TInfo tinfo, TCredentials credentials, String user, byte sysPerm) throws ThriftSecurityException {
+ return security.hasSystemPermission(credentials, user, SystemPermission.getPermissionById(sysPerm));
+ }
+
+ @Override
+ public boolean hasTablePermission(TInfo tinfo, TCredentials credentials, String user, String tableName, byte tblPerm) throws ThriftSecurityException,
+ ThriftTableOperationException {
+ String tableId = checkTableId(tableName, TableOperation.PERMISSION);
+ return security.hasTablePermission(credentials, user, tableId, TablePermission.getPermissionById(tblPerm));
+ }
+
+ @Override
+ public Set<String> listLocalUsers(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+ return security.listUsers(credentials);
+ }
+
+ private static Map<String,String> conf(TCredentials credentials, AccumuloConfiguration conf) throws TException {
+ security.authenticateUser(credentials, credentials);
+ conf.invalidateCache();
+
+ Map<String,String> result = new HashMap<String,String>();
+ for (Entry<String,String> entry : conf) {
+ String key = entry.getKey();
+ if (!Property.isSensitive(key))
+ result.put(key, entry.getValue());
+ }
+ return result;
+ }
+
+ @Override
+ public Map<String,String> getConfiguration(TInfo tinfo, TCredentials credentials, ConfigurationType type) throws TException {
+ switch (type) {
+ case CURRENT:
+ return conf(credentials, new ServerConfiguration(instance).getConfiguration());
+ case SITE:
+ return conf(credentials, ServerConfiguration.getSiteConfiguration());
+ case DEFAULT:
+ return conf(credentials, AccumuloConfiguration.getDefaultConfiguration());
+ }
+ throw new RuntimeException("Unexpected configuration type " + type);
+ }
+
+ @Override
+ public Map<String,String> getTableConfiguration(TInfo tinfo, TCredentials credentials, String tableName) throws TException, ThriftTableOperationException {
+ String tableId = checkTableId(tableName, null);
+ return conf(credentials, new ServerConfiguration(instance).getTableConfiguration(tableId));
+ }
+
+ @Override
+ public List<String> bulkImportFiles(TInfo tinfo, final TCredentials credentials, final long tid, final String tableId, final List<String> files,
+ final String errorDir, final boolean setTime) throws ThriftSecurityException, ThriftTableOperationException, TException {
+ try {
+ if (!security.canPerformSystemActions(credentials))
+ throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<String>>() {
+ @Override
+ public List<String> call() throws Exception {
+ return BulkImporter.bulkLoad(new ServerConfiguration(instance).getConfiguration(), instance, new Credentials(credentials.getPrincipal(),
+ AuthenticationTokenSerializer.deserialize(credentials.getTokenClassName(), credentials.getToken())), tid, tableId, files, errorDir, setTime);
+ }
+ });
+ } catch (AccumuloSecurityException e) {
+ throw e.asThriftException();
+ } catch (Exception ex) {
+ throw new TException(ex);
+ }
+ }
+
+ @Override
+ public boolean isActive(TInfo tinfo, long tid) throws TException {
+ return transactionWatcher.isActive(tid);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public boolean checkClass(TInfo tinfo, TCredentials credentials, String className, String interfaceMatch) throws TException {
+ security.authenticateUser(credentials, credentials);
+
+ ClassLoader loader = getClass().getClassLoader();
+ Class shouldMatch;
+ try {
+ shouldMatch = loader.loadClass(interfaceMatch);
+ Class test = AccumuloVFSClassLoader.loadClass(className, shouldMatch);
+ test.newInstance();
+ return true;
+ } catch (ClassCastException e) {
+ log.warn("Error checking object types", e);
+ return false;
+ } catch (ClassNotFoundException e) {
+ log.warn("Error checking object types", e);
+ return false;
+ } catch (InstantiationException e) {
+ log.warn("Error checking object types", e);
+ return false;
+ } catch (IllegalAccessException e) {
+ log.warn("Error checking object types", e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean checkTableClass(TInfo tinfo, TCredentials credentials, String tableName, String className, String interfaceMatch) throws TException,
+ ThriftTableOperationException, ThriftSecurityException {
+
+ security.authenticateUser(credentials, credentials);
+
+ String tableId = checkTableId(tableName, null);
+
+ ClassLoader loader = getClass().getClassLoader();
+ Class<?> shouldMatch;
+ try {
+ shouldMatch = loader.loadClass(interfaceMatch);
+
+ new ServerConfiguration(instance).getTableConfiguration(tableId);
+
+ String context = new ServerConfiguration(instance).getTableConfiguration(tableId).get(Property.TABLE_CLASSPATH);
+
+ ClassLoader currentLoader;
+
+ if (context != null && !context.equals("")) {
+ currentLoader = AccumuloVFSClassLoader.getContextManager().getClassLoader(context);
+ } else {
+ currentLoader = AccumuloVFSClassLoader.getClassLoader();
+ }
+
+ Class<?> test = currentLoader.loadClass(className).asSubclass(shouldMatch);
+ test.newInstance();
+ return true;
+ } catch (Exception e) {
+ log.warn("Error checking object types", e);
+ return false;
+ }
+ }
+
+ @Override
+ public List<TDiskUsage> getDiskUsage(Set<String> tables, TCredentials credentials) throws ThriftTableOperationException, ThriftSecurityException, TException {
+ try {
+ AuthenticationToken token = AuthenticationTokenSerializer.deserialize(credentials.getTokenClassName(), credentials.getToken());
+ Connector conn = instance.getConnector(credentials.getPrincipal(), token);
+
+ HashSet<String> tableIds = new HashSet<String>();
+
+ for (String table : tables) {
+ // ensure that table table exists
+ String tableId = checkTableId(table, null);
+ tableIds.add(tableId);
+ if (!security.canScan(credentials, tableId))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ }
+
+ // use the same set of tableIds that were validated above to avoid race conditions
+ Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn);
+ List<TDiskUsage> retUsages = new ArrayList<TDiskUsage>();
+ for (Map.Entry<TreeSet<String>,Long> usageItem : diskUsage.entrySet()) {
+ retUsages.add(new TDiskUsage(new ArrayList<String>(usageItem.getKey()), usageItem.getValue()));
+ }
+ return retUsages;
+
+ } catch (AccumuloSecurityException e) {
+ throw e.asThriftException();
+ } catch (AccumuloException e) {
+ throw new TException(e);
+ } catch (IOException e) {
+ throw new TException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
new file mode 100644
index 0000000..9e6bbe7
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * An implementation of Instance that looks in HDFS and ZooKeeper to find the master and root tablet location.
+ *
+ */
+public class HdfsZooInstance implements Instance {
+
+ public static class AccumuloNotInitializedException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public AccumuloNotInitializedException(String string) {
+ super(string);
+ }
+ }
+
+ private HdfsZooInstance() {
+ AccumuloConfiguration acuConf = ServerConfiguration.getSiteConfiguration();
+ zooCache = new ZooCache(acuConf.get(Property.INSTANCE_ZK_HOST), (int) acuConf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ }
+
+ private static HdfsZooInstance cachedHdfsZooInstance = null;
+
+ public static synchronized Instance getInstance() {
+ if (cachedHdfsZooInstance == null)
+ cachedHdfsZooInstance = new HdfsZooInstance();
+ return cachedHdfsZooInstance;
+ }
+
+ private static ZooCache zooCache;
+ private static String instanceId = null;
+ private static final Logger log = Logger.getLogger(HdfsZooInstance.class);
+
+ @Override
+ public String getRootTabletLocation() {
+ String zRootLocPath = ZooUtil.getRoot(this) + RootTable.ZROOT_TABLET_LOCATION;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zoocache.");
+
+ byte[] loc = zooCache.get(zRootLocPath);
+
+ opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return null;
+ }
+
+ return new String(loc).split("\\|")[0];
+ }
+
+ @Override
+ public List<String> getMasterLocations() {
+
+ String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
+
+ byte[] loc = ZooLock.getLockData(zooCache, masterLocPath, null);
+
+ opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return Collections.emptyList();
+ }
+
+ return Collections.singletonList(new String(loc));
+ }
+
+ @Override
+ public String getInstanceID() {
+ if (instanceId == null)
+ _getInstanceID();
+ return instanceId;
+ }
+
+ private static synchronized void _getInstanceID() {
+ if (instanceId == null) {
+ String instanceIdFromFile = ZooUtil.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation());
+ instanceId = instanceIdFromFile;
+ }
+ }
+
+ @Override
+ public String getInstanceName() {
+ return ZooKeeperInstance.lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
+ }
+
+ @Override
+ public String getZooKeepers() {
+ return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST);
+ }
+
+ @Override
+ public int getZooKeepersSessionTimeOut() {
+ return (int) ServerConfiguration.getSiteConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+ }
+
+ @Override
+ public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
+ return new ConnectorImpl(this, new Credentials(principal, token));
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, new PasswordToken(pass));
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, ByteBufferUtil.toBytes(pass));
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
+ }
+
+ private AccumuloConfiguration conf = null;
+
+ @Override
+ public AccumuloConfiguration getConfiguration() {
+ if (conf == null)
+ conf = new ServerConfiguration(this).getConfiguration();
+ return conf;
+ }
+
+ @Override
+ public void setConfiguration(AccumuloConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public static void main(String[] args) {
+ Instance instance = HdfsZooInstance.getInstance();
+ System.out.println("Instance Name: " + instance.getInstanceName());
+ System.out.println("Instance ID: " + instance.getInstanceID());
+ System.out.println("ZooKeepers: " + instance.getZooKeepers());
+ System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
+ }
+
+ @Override
+ public void close() throws AccumuloException {
+ try {
+ zooCache.close();
+ } catch (InterruptedException e) {
+ throw new AccumuloException("Issues closing ZooKeeper, try again");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
new file mode 100644
index 0000000..442294f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ConfigSanityCheck {
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ new ServerConfiguration(HdfsZooInstance.getInstance()).getConfiguration();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
new file mode 100644
index 0000000..c9fd5a1
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.security.SecurityPermission;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigSanityCheck;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+
+public class ServerConfiguration {
+
+ private static final Map<String,TableConfiguration> tableInstances = new HashMap<String,TableConfiguration>(1);
+ private static SecurityPermission CONFIGURATION_PERMISSION = new SecurityPermission("configurationPermission");
+
+ public static synchronized SiteConfiguration getSiteConfiguration() {
+ checkPermissions();
+ return SiteConfiguration.getInstance(getDefaultConfiguration());
+ }
+
+ private static void checkPermissions() {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(CONFIGURATION_PERMISSION);
+ }
+ }
+
+ private static synchronized ZooConfiguration getZooConfiguration(Instance instance) {
+ checkPermissions();
+ return ZooConfiguration.getInstance(instance, getSiteConfiguration());
+ }
+
+ public static synchronized DefaultConfiguration getDefaultConfiguration() {
+ checkPermissions();
+ return DefaultConfiguration.getInstance();
+ }
+
+ public static synchronized AccumuloConfiguration getSystemConfiguration(Instance instance) {
+ return getZooConfiguration(instance);
+ }
+
+ public static TableConfiguration getTableConfiguration(Instance instance, String tableId) {
+ checkPermissions();
+ synchronized (tableInstances) {
+ TableConfiguration conf = tableInstances.get(tableId);
+ if (conf == null) {
+ conf = new TableConfiguration(instance.getInstanceID(), tableId, getSystemConfiguration(instance));
+ ConfigSanityCheck.validate(conf);
+ tableInstances.put(tableId, conf);
+ }
+ return conf;
+ }
+ }
+
+ static void removeTableIdInstance(String tableId) {
+ synchronized (tableInstances) {
+ tableInstances.remove(tableId);
+ }
+ }
+
+ static void expireAllTableObservers() {
+ synchronized (tableInstances) {
+ for (Entry<String,TableConfiguration> entry : tableInstances.entrySet()) {
+ entry.getValue().expireAllObservers();
+ }
+ }
+ }
+
+ private final Instance instance;
+
+ public ServerConfiguration(Instance instance) {
+ this.instance = instance;
+ }
+
+ public TableConfiguration getTableConfiguration(String tableId) {
+ return getTableConfiguration(instance, tableId);
+ }
+
+ public TableConfiguration getTableConfiguration(KeyExtent extent) {
+ return getTableConfiguration(extent.getTableId().toString());
+ }
+
+ public synchronized AccumuloConfiguration getConfiguration() {
+ return getZooConfiguration(instance);
+ }
+
+ public Instance getInstance() {
+ return instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
new file mode 100644
index 0000000..c407309
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+class TableConfWatcher implements Watcher {
+ static {
+ Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN);
+ Logger.getLogger("org.apache.hadoop.io.compress").setLevel(Level.WARN);
+ }
+
+ private static final Logger log = Logger.getLogger(TableConfWatcher.class);
+ private Instance instance = null;
+
+ TableConfWatcher(Instance instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ String path = event.getPath();
+ if (log.isTraceEnabled())
+ log.trace("WatchEvent : " + path + " " + event.getState() + " " + event.getType());
+
+ String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES + "/";
+
+ String tableId = null;
+ String key = null;
+
+ if (path != null) {
+ if (path.startsWith(tablesPrefix)) {
+ tableId = path.substring(tablesPrefix.length());
+ if (tableId.contains("/")) {
+ tableId = tableId.substring(0, tableId.indexOf('/'));
+ if (path.startsWith(tablesPrefix + tableId + Constants.ZTABLE_CONF + "/"))
+ key = path.substring((tablesPrefix + tableId + Constants.ZTABLE_CONF + "/").length());
+ }
+ }
+
+ if (tableId == null) {
+ log.warn("Zookeeper told me about a path I was not watching " + path + " state=" + event.getState() + " type=" + event.getType());
+ return;
+ }
+ }
+
+ switch (event.getType()) {
+ case NodeDataChanged:
+ if (log.isTraceEnabled())
+ log.trace("EventNodeDataChanged " + event.getPath());
+ if (key != null)
+ ServerConfiguration.getTableConfiguration(instance, tableId).propertyChanged(key);
+ break;
+ case NodeChildrenChanged:
+ ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged(key);
+ break;
+ case NodeDeleted:
+ if (key == null) {
+ // only remove the AccumuloConfiguration object when a
+ // table node is deleted, not when a tables property is
+ // deleted.
+ ServerConfiguration.removeTableIdInstance(tableId);
+ }
+ break;
+ case None:
+ switch (event.getState()) {
+ case Expired:
+ ServerConfiguration.expireAllTableObservers();
+ break;
+ case SyncConnected:
+ break;
+ case Disconnected:
+ break;
+ default:
+ log.warn("EventNone event not handled path = " + event.getPath() + " state=" + event.getState());
+ }
+ break;
+ case NodeCreated:
+ switch (event.getState()) {
+ case SyncConnected:
+ break;
+ default:
+ log.warn("Event NodeCreated event not handled path = " + event.getPath() + " state=" + event.getState());
+ }
+ break;
+ default:
+ log.warn("Event not handled path = " + event.getPath() + " state=" + event.getState() + " type = " + event.getType());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
new file mode 100644
index 0000000..4c58153
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.Logger;
+
+public class TableConfiguration extends AccumuloConfiguration {
+ private static final Logger log = Logger.getLogger(TableConfiguration.class);
+
+ private static ZooCache tablePropCache = null;
+ private final String instanceId;
+ private final AccumuloConfiguration parent;
+
+ private String table = null;
+ private Set<ConfigurationObserver> observers;
+
+ public TableConfiguration(String instanceId, String table, AccumuloConfiguration parent) {
+ this.instanceId = instanceId;
+ this.table = table;
+ this.parent = parent;
+
+ this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
+ }
+
+ private static ZooCache getTablePropCache() {
+ Instance inst = HdfsZooInstance.getInstance();
+ if (tablePropCache == null)
+ synchronized (TableConfiguration.class) {
+ if (tablePropCache == null)
+ tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
+ }
+ return tablePropCache;
+ }
+
+ public void addObserver(ConfigurationObserver co) {
+ if (table == null) {
+ String err = "Attempt to add observer for non-table configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ iterator();
+ observers.add(co);
+ }
+
+ public void removeObserver(ConfigurationObserver configObserver) {
+ if (table == null) {
+ String err = "Attempt to remove observer for non-table configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ observers.remove(configObserver);
+ }
+
+ public void expireAllObservers() {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.sessionExpired();
+ }
+
+ public void propertyChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertyChanged(key);
+ }
+
+ public void propertiesChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertiesChanged();
+ }
+
+ @Override
+ public String get(Property property) {
+ String key = property.getKey();
+ String value = get(key);
+
+ if (value == null || !property.getType().isValidFormat(value)) {
+ if (value != null)
+ log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+ value = parent.get(property);
+ }
+ return value;
+ }
+
+ private String get(String key) {
+ String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
+ byte[] v = getTablePropCache().get(zPath);
+ String value = null;
+ if (v != null)
+ value = new String(v);
+ return value;
+ }
+
+ @Override
+ public Iterator<Entry<String,String>> iterator() {
+ TreeMap<String,String> entries = new TreeMap<String,String>();
+
+ for (Entry<String,String> parentEntry : parent)
+ entries.put(parentEntry.getKey(), parentEntry.getValue());
+
+ List<String> children = getTablePropCache().getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF);
+ if (children != null) {
+ for (String child : children) {
+ String value = get(child);
+ if (child != null && value != null)
+ entries.put(child, value);
+ }
+ }
+
+ return entries.entrySet().iterator();
+ }
+
+ public String getTableId() {
+ return table;
+ }
+}