You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:55:53 UTC
[14/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java b/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
deleted file mode 100644
index bc81786..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state.tables;
-
-import java.security.SecurityPermission;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.util.TablePropUtil;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class TableManager {
- private static SecurityPermission TABLE_MANAGER_PERMISSION = new SecurityPermission("tableManagerPermission");
-
- private static final Logger log = Logger.getLogger(TableManager.class);
- private static final Set<TableObserver> observers = Collections.synchronizedSet(new HashSet<TableObserver>());
- private static final Map<String,TableState> tableStateCache = Collections.synchronizedMap(new HashMap<String,TableState>());
-
- private static TableManager tableManager = null;
-
- private final Instance instance;
- private ZooCache zooStateCache;
-
- public static void prepareNewTableState(String instanceId, String tableId, String tableName, TableState state, NodeExistsPolicy existsPolicy)
- throws KeeperException, InterruptedException {
- // state gets created last
- String zTablePath = Constants.ZROOT + "/" + instanceId + Constants.ZTABLES + "/" + tableId;
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
- zoo.putPersistentData(zTablePath, new byte[0], existsPolicy);
- zoo.putPersistentData(zTablePath + Constants.ZTABLE_CONF, new byte[0], existsPolicy);
- zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAME, tableName.getBytes(), existsPolicy);
- zoo.putPersistentData(zTablePath + Constants.ZTABLE_STATE, state.name().getBytes(), existsPolicy);
- zoo.putPersistentData(zTablePath + Constants.ZTABLE_FLUSH_ID, "0".getBytes(), existsPolicy);
- zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_ID, "0".getBytes(), existsPolicy);
- zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_CANCEL_ID, "0".getBytes(), existsPolicy);
- }
-
- public synchronized static TableManager getInstance() {
- SecurityManager sm = System.getSecurityManager();
- if (sm != null) {
- sm.checkPermission(TABLE_MANAGER_PERMISSION);
- }
- if (tableManager == null)
- tableManager = new TableManager();
- return tableManager;
- }
-
- private TableManager() {
- instance = HdfsZooInstance.getInstance();
- zooStateCache = new ZooCache(new TableStateWatcher());
- updateTableStateCache();
- }
-
- public TableState getTableState(String tableId) {
- return tableStateCache.get(tableId);
- }
-
- public static class IllegalTableTransitionException extends Exception {
- private static final long serialVersionUID = 1L;
-
- final TableState oldState;
- final TableState newState;
-
- public IllegalTableTransitionException(TableState oldState, TableState newState) {
- this.oldState = oldState;
- this.newState = newState;
- }
-
- public TableState getOldState() {
- return oldState;
- }
-
- public TableState getNewState() {
- return newState;
- }
-
- }
-
- public synchronized void transitionTableState(final String tableId, final TableState newState) {
- String statePath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE;
-
- try {
- ZooReaderWriter.getRetryingInstance().mutate(statePath, (byte[]) newState.name().getBytes(), ZooUtil.PUBLIC, new Mutator() {
- @Override
- public byte[] mutate(byte[] oldData) throws Exception {
- TableState oldState = TableState.UNKNOWN;
- if (oldData != null)
- oldState = TableState.valueOf(new String(oldData));
- boolean transition = true;
- // +--------+
- // v |
- // NEW -> (ONLINE|OFFLINE)+--- DELETING
- switch (oldState) {
- case NEW:
- transition = (newState == TableState.OFFLINE || newState == TableState.ONLINE);
- break;
- case ONLINE: // fall-through intended
- case UNKNOWN:// fall through intended
- case OFFLINE:
- transition = (newState != TableState.NEW);
- break;
- case DELETING:
- // Can't transition to any state from DELETING
- transition = false;
- break;
- }
- if (!transition)
- throw new IllegalTableTransitionException(oldState, newState);
- log.debug("Transitioning state for table " + tableId + " from " + oldState + " to " + newState);
- return newState.name().getBytes();
- }
- });
- } catch (Exception e) {
- log.fatal("Failed to transition table to state " + newState);
- throw new RuntimeException(e);
- }
- }
-
- private void updateTableStateCache() {
- synchronized (tableStateCache) {
- for (String tableId : zooStateCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES))
- if (zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE) != null)
- updateTableStateCache(tableId);
- }
- }
-
- public TableState updateTableStateCache(String tableId) {
- synchronized (tableStateCache) {
- TableState tState = TableState.UNKNOWN;
- byte[] data = zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE);
- if (data != null) {
- String sState = new String(data);
- try {
- tState = TableState.valueOf(sState);
- } catch (IllegalArgumentException e) {
- log.error("Unrecognized state for table with tableId=" + tableId + ": " + sState);
- }
- tableStateCache.put(tableId, tState);
- }
- return tState;
- }
- }
-
- public void addTable(String tableId, String tableName, NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException {
- prepareNewTableState(instance.getInstanceID(), tableId, tableName, TableState.NEW, existsPolicy);
- updateTableStateCache(tableId);
- }
-
- public void cloneTable(String srcTable, String tableId, String tableName, Map<String,String> propertiesToSet, Set<String> propertiesToExclude,
- NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException {
- prepareNewTableState(instance.getInstanceID(), tableId, tableName, TableState.NEW, existsPolicy);
- String srcTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + srcTable + Constants.ZTABLE_CONF;
- String newTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF;
- ZooReaderWriter.getRetryingInstance().recursiveCopyPersistent(srcTablePath, newTablePath, NodeExistsPolicy.OVERWRITE);
-
- for (Entry<String,String> entry : propertiesToSet.entrySet())
- TablePropUtil.setTableProperty(tableId, entry.getKey(), entry.getValue());
-
- for (String prop : propertiesToExclude)
- ZooReaderWriter.getRetryingInstance().recursiveDelete(
- Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF + "/" + prop, NodeMissingPolicy.SKIP);
-
- updateTableStateCache(tableId);
- }
-
- public void removeTable(String tableId) throws KeeperException, InterruptedException {
- synchronized (tableStateCache) {
- tableStateCache.remove(tableId);
- ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE,
- NodeMissingPolicy.SKIP);
- ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP);
- }
- }
-
- public boolean addObserver(TableObserver to) {
- synchronized (observers) {
- synchronized (tableStateCache) {
- to.initialize(Collections.unmodifiableMap(tableStateCache));
- return observers.add(to);
- }
- }
- }
-
- public boolean removeObserver(TableObserver to) {
- return observers.remove(to);
- }
-
- private class TableStateWatcher implements Watcher {
- @Override
- public void process(WatchedEvent event) {
- if (log.isTraceEnabled())
- log.trace(event);
-
- final String zPath = event.getPath();
- final EventType zType = event.getType();
-
- String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES;
- String tableId = null;
-
- if (zPath != null && zPath.startsWith(tablesPrefix + "/")) {
- String suffix = zPath.substring(tablesPrefix.length() + 1);
- if (suffix.contains("/")) {
- String[] sa = suffix.split("/", 2);
- if (Constants.ZTABLE_STATE.equals("/" + sa[1]))
- tableId = sa[0];
- }
- if (tableId == null) {
- log.warn("Unknown path in " + event);
- return;
- }
- }
-
- switch (zType) {
- case NodeChildrenChanged:
- if (zPath != null && zPath.equals(tablesPrefix)) {
- updateTableStateCache();
- } else {
- log.warn("Unexpected path " + zPath);
- }
- break;
- case NodeCreated:
- case NodeDataChanged:
- // state transition
- TableState tState = updateTableStateCache(tableId);
- log.debug("State transition to " + tState + " @ " + event);
- synchronized (observers) {
- for (TableObserver to : observers)
- to.stateChanged(tableId, tState);
- }
- break;
- case NodeDeleted:
- if (zPath != null
- && tableId != null
- && (zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_STATE) || zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_CONF) || zPath
- .equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_NAME)))
- tableStateCache.remove(tableId);
- break;
- case None:
- switch (event.getState()) {
- case Expired:
- if (log.isTraceEnabled())
- log.trace("Session expired " + event);
- synchronized (observers) {
- for (TableObserver to : observers)
- to.sessionExpired();
- }
- break;
- case SyncConnected:
- default:
- if (log.isTraceEnabled())
- log.trace("Ignored " + event);
- }
- break;
- default:
- log.warn("Unandled " + event);
- }
- }
- }
-
- /*
- * private static boolean verifyTabletAssignments(String tableId) { log.info( "Sending message to load balancer to verify assignment of tablets with tableId="
- * + tableId); // Return true only if transitions to other states did not interrupt // this process. (like deleting the table) return true; }
- *
- * private static synchronized boolean unloadTable(String tableId) { int loadedTabletCount = 0; while (loadedTabletCount > 0) { // wait for tables to be
- * unloaded } log.info("Table unloaded. tableId=" + tableId); return true; }
- *
- * private static void cleanupDeletedTable(String tableId) { log.info("Sending message to cleanup the deleted table with tableId=" + tableId); }
- *
- * switch (tState) { case NEW: // this should really only happen before the watcher // knows about the table log.error("Unexpected transition to " + tState +
- * " @ " + event); break;
- *
- * case LOADING: // a table has started coming online or has pending // migrations (maybe?) if (verifyTabletAssignments(tableId))
- * TableState.transition(instance, tableId, TableState.ONLINE); break; case ONLINE: log.trace("Table online with tableId=" + tableId); break;
- *
- * case DISABLING: if (unloadTable(tableId)) TableState.transition(instance, tableId, TableState.DISABLED); break; case DISABLED:
- * log.trace("Table disabled with tableId=" + tableId); break;
- *
- * case UNLOADING: unloadTable(tableId); TableState.transition(instance, tableId, TableState.OFFLINE); case OFFLINE: break;
- *
- * case DELETING: unloadTable(tableId); cleanupDeletedTable(tableId); break;
- *
- * default: log.error("Unrecognized transition to " + tState + " @ " + event); }
- */
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableObserver.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableObserver.java b/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableObserver.java
deleted file mode 100644
index ea13585..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableObserver.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state.tables;
-
-import java.util.Map;
-
-import org.apache.accumulo.core.master.state.tables.TableState;
-
-public interface TableObserver {
- void initialize(Map<String,TableState> tableIdToStateMap);
-
- void stateChanged(String tableId, TableState tState);
-
- void sessionExpired();
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java b/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
deleted file mode 100644
index 5ef90ab..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.tableOps;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.ServerClient;
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.ClientService;
-import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
-import org.apache.accumulo.core.client.impl.thrift.TableOperation;
-import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
-import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
-import org.apache.accumulo.trace.instrument.TraceExecutorService;
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-
-/*
- * Bulk import makes requests of tablet servers, and those requests can take a
- * long time. Our communications to the tablet server may fail, so we won't know
- * the status of the request. The master will repeat failed requests so now
- * there are multiple requests to the tablet server. The tablet server will not
- * execute the request multiple times, so long as the marker it wrote in the
- * metadata table stays there. The master needs to know when all requests have
- * finished so it can remove the markers. Did it start? Did it finish? We can see
- * that *a* request completed by seeing the flag written into the metadata
- * table, but we won't know if some other rogue thread is still waiting to start
- * a thread and repeat the operation.
- *
- * The master can ask the tablet server if it has any requests still running.
- * Except the tablet server might have some thread about to start a request, but
- * before it has made any bookkeeping about the request. To prevent problems
- * like this, an Arbitrator is used. Before starting any new request, the tablet
- * server checks the Arbitrator to see if the request is still valid.
- *
- */
-
-public class BulkImport extends MasterRepo {
- public static final String FAILURES_TXT = "failures.txt";
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger log = Logger.getLogger(BulkImport.class);
-
- private String tableId;
- private String sourceDir;
- private String errorDir;
- private boolean setTime;
-
- public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
- this.tableId = tableId;
- this.sourceDir = sourceDir;
- this.errorDir = errorDir;
- this.setTime = setTime;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- if (!Utils.getReadLock(tableId, tid).tryLock())
- return 100;
-
- Instance instance = HdfsZooInstance.getInstance();
- Tables.clearCache(instance);
- if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
- long reserve1, reserve2;
- reserve1 = reserve2 = Utils.reserveHdfsDirectory(sourceDir, tid);
- if (reserve1 == 0)
- reserve2 = Utils.reserveHdfsDirectory(errorDir, tid);
- return reserve2;
- } else {
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
- }
- }
-
- @Override
- //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
- @SuppressWarnings("deprecation")
- public Repo<Master> call(long tid, Master master) throws Exception {
- log.debug(" tid " + tid + " sourceDir " + sourceDir);
-
- Utils.getReadLock(tableId, tid).lock();
-
- // check that the error directory exists and is empty
- VolumeManager fs = master.getFileSystem();
-
- Path errorPath = new Path(errorDir);
- FileStatus errorStatus = null;
- try {
- errorStatus = fs.getFileStatus(errorPath);
- } catch (FileNotFoundException ex) {
- // ignored
- }
- if (errorStatus == null)
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
- + " does not exist");
- if (!errorStatus.isDir())
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
- + " is not a directory");
- if (fs.listStatus(errorPath).length != 0)
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
- + " is not empty");
-
- ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
-
- // move the files into the directory
- try {
- String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
- log.debug(" tid " + tid + " bulkDir " + bulkDir);
- return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
- } catch (IOException ex) {
- log.error("error preparing the bulk import directory", ex);
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, sourceDir + ": "
- + ex);
- }
- }
-
- private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
-
- String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString();
-
- if (tableDir == null)
- throw new IllegalStateException(sourceDir + " is not in a known namespace");
- Path directory = new Path(tableDir + "/" + tableId);
- fs.mkdirs(directory);
-
- // only one should be able to create the lock file
- // the purpose of the lock file is to avoid a race
- // condition between the call to fs.exists() and
- // fs.mkdirs()... if only hadoop had a mkdir() function
- // that failed when the dir existed
-
- UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-
- while (true) {
- Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
- if (fs.exists(newBulkDir)) // sanity check
- throw new IllegalStateException("Dir exist when it should not " + newBulkDir);
- if (fs.mkdirs(newBulkDir))
- return newBulkDir;
- log.warn("Failed to create " + newBulkDir + " for unknown reason");
-
- UtilWaitThread.sleep(3000);
- }
- }
-
- //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
- @SuppressWarnings("deprecation")
- private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
- Path bulkDir = createNewBulkDir(fs, tableId);
-
- MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-
- Path dirPath = new Path(dir);
- FileStatus[] mapFiles = fs.listStatus(dirPath);
-
- UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-
- for (FileStatus fileStatus : mapFiles) {
- String sa[] = fileStatus.getPath().getName().split("\\.");
- String extension = "";
- if (sa.length > 1) {
- extension = sa[sa.length - 1];
-
- if (!FileOperations.getValidExtensions().contains(extension)) {
- log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
- continue;
- }
- } else {
- // assume it is a map file
- extension = Constants.MAPFILE_EXTENSION;
- }
-
- if (extension.equals(Constants.MAPFILE_EXTENSION)) {
- if (!fileStatus.isDir()) {
- log.warn(fileStatus.getPath() + " is not a map file, ignoring");
- continue;
- }
-
- if (fileStatus.getPath().getName().equals("_logs")) {
- log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
- continue;
- }
- try {
- FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
- if (dataStatus.isDir()) {
- log.warn(fileStatus.getPath() + " is not a map file, ignoring");
- continue;
- }
- } catch (FileNotFoundException fnfe) {
- log.warn(fileStatus.getPath() + " is not a map file, ignoring");
- continue;
- }
- }
-
- String newName = "I" + namer.getNextName() + "." + extension;
- Path newPath = new Path(bulkDir, newName);
- try {
- fs.rename(fileStatus.getPath(), newPath);
- log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
- } catch (IOException E1) {
- log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage());
- }
- }
- return bulkDir.toString();
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- // unreserve source/error directories
- Utils.unreserveHdfsDirectory(sourceDir, tid);
- Utils.unreserveHdfsDirectory(errorDir, tid);
- Utils.getReadLock(tableId, tid).unlock();
- }
-}
-
-class CleanUpBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- log.debug("removing the bulk processing flag file in " + bulk);
- Path bulkDir = new Path(bulk);
- MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
- MetadataTableUtil.addDeleteEntry(tableId, "/" + bulkDir.getName());
- log.debug("removing the metadata table markers for loaded files");
- Connector conn = master.getConnector();
- MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
- log.debug("releasing HDFS reservations for " + source + " and " + error);
- Utils.unreserveHdfsDirectory(source, tid);
- Utils.unreserveHdfsDirectory(error, tid);
- Utils.getReadLock(tableId, tid).unlock();
- log.debug("completing bulk import transaction " + tid);
- ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
- return null;
- }
-}
-
-class CompleteBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CompleteBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
- return new CopyFailed(tableId, source, bulk, error);
- }
-}
-
-class CopyFailed extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CopyFailed(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- Set<TServerInstance> finished = new HashSet<TServerInstance>();
- Set<TServerInstance> running = master.onlineTabletServers();
- for (TServerInstance server : running) {
- try {
- TServerConnection client = master.getConnection(server);
- if (client != null && !client.isActive(tid))
- finished.add(server);
- } catch (TException ex) {
- log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
- }
- }
- if (finished.containsAll(running))
- return 0;
- return 500;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- // This needs to execute after the arbiter is stopped
-
- VolumeManager fs = master.getFileSystem();
-
- if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
- return new CleanUpBulkImport(tableId, source, bulk, error);
-
- HashMap<String,String> failures = new HashMap<String,String>();
- HashMap<String,String> loadedFailures = new HashMap<String,String>();
-
- FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
- BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
- try {
- String line = null;
- while ((line = in.readLine()) != null) {
- Path path = new Path(line);
- if (!fs.exists(new Path(error, path.getName())))
- failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
- }
- } finally {
- failFile.close();
- }
-
- /*
- * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
- * have no loaded markers.
- */
-
- // determine which failed files were loaded
- Connector conn = master.getConnector();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
- for (Entry<Key,Value> entry : mscanner) {
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- String loadedFile = entry.getKey().getColumnQualifier().toString();
- String absPath = failures.remove(loadedFile);
- if (absPath != null) {
- loadedFailures.put(loadedFile, absPath);
- }
- }
- }
-
- // move failed files that were not loaded
- for (String failure : failures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
- fs.rename(orig, dest);
- log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
- }
-
- if (loadedFailures.size() > 0) {
- DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
- + Constants.ZBULK_FAILED_COPYQ);
-
- HashSet<String> workIds = new HashSet<String>();
-
- for (String failure : loadedFailures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
-
- if (fs.exists(dest))
- continue;
-
- bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes());
- workIds.add(orig.getName());
- log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
- }
-
- bifCopyQueue.waitUntilDone(workIds);
- }
-
- fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
- return new CleanUpBulkImport(tableId, source, bulk, error);
- }
-
-}
-
-class LoadFiles extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static ExecutorService threadPool = null;
- static {
-
- }
- private static final Logger log = Logger.getLogger(BulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String errorDir;
- private boolean setTime;
-
- public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.errorDir = errorDir;
- this.setTime = setTime;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- if (master.onlineTabletServers().size() == 0)
- return 500;
- return 0;
- }
-
- synchronized void initializeThreadPool(Master master) {
- if (threadPool == null) {
- int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
- ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
- pool.allowCoreThreadTimeOut(true);
- threadPool = new TraceExecutorService(pool);
- }
- }
-
- @Override
- public Repo<Master> call(final long tid, final Master master) throws Exception {
- initializeThreadPool(master);
- final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
- VolumeManager fs = master.getFileSystem();
- List<FileStatus> files = new ArrayList<FileStatus>();
- for (FileStatus entry : fs.listStatus(new Path(bulk))) {
- files.add(entry);
- }
- log.debug("tid " + tid + " importing " + files.size() + " files");
-
- Path writable = new Path(this.errorDir, ".iswritable");
- if (!fs.createNewFile(writable)) {
- // Maybe this is a re-try... clear the flag and try again
- fs.delete(writable);
- if (!fs.createNewFile(writable))
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
- "Unable to write to " + this.errorDir);
- }
- fs.delete(writable);
-
- final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
- for (FileStatus f : files)
- filesToLoad.add(f.getPath().toString());
-
- final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
- for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
- List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-
- if (master.onlineTabletServers().size() == 0)
- log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
-
- while (master.onlineTabletServers().size() == 0) {
- UtilWaitThread.sleep(500);
- }
-
- // Use the threadpool to assign files one-at-a-time to the server
- final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
- for (final String file : filesToLoad) {
- results.add(threadPool.submit(new Callable<List<String>>() {
- @Override
- public List<String> call() {
- List<String> failures = new ArrayList<String>();
- ClientService.Client client = null;
- String server = null;
- try {
- // get a connection to a random tablet server, do not prefer cached connections because
- // this is running on the master and there are lots of connections to tablet servers
- // serving the !METADATA tablets
- long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
- client = pair.getSecond();
- server = pair.getFirst();
- List<String> attempt = Collections.singletonList(file);
- log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
- List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
- errorDir, setTime);
- if (fail.isEmpty()) {
- loaded.add(file);
- } else {
- failures.addAll(fail);
- }
- } catch (Exception ex) {
- log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
- } finally {
- ServerClient.close(client);
- }
- return failures;
- }
- }));
- }
- Set<String> failures = new HashSet<String>();
- for (Future<List<String>> f : results)
- failures.addAll(f.get());
- filesToLoad.removeAll(loaded);
- if (filesToLoad.size() > 0) {
- log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
- UtilWaitThread.sleep(100);
- }
- }
-
- FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
- try {
- for (String f : filesToLoad) {
- out.write(f);
- out.write("\n");
- }
- } finally {
- out.close();
- }
-
- // return the next step, which will perform cleanup
- return new CompleteBulkImport(tableId, source, bulk, errorDir);
- }
-
- static String sampleList(Collection<?> potentiallyLongList, int max) {
- StringBuffer result = new StringBuffer();
- result.append("[");
- int i = 0;
- for (Object obj : potentiallyLongList) {
- result.append(obj);
- if (i >= max) {
- result.append("...");
- break;
- } else {
- result.append(", ");
- }
- i++;
- }
- if (i < max)
- result.delete(result.length() - 2, result.length());
- result.append("]");
- return result.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CancelCompactions.java b/server/src/main/java/org/apache/accumulo/server/master/tableOps/CancelCompactions.java
deleted file mode 100644
index 7d06639..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CancelCompactions.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.tableOps;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.impl.thrift.TableOperation;
-import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-
-class FinishCancelCompaction extends MasterRepo {
- private static final long serialVersionUID = 1L;
- private String tableId;
-
- public FinishCancelCompaction(String tableId) {
- this.tableId = tableId;
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- Utils.getReadLock(tableId, tid).unlock();
- return null;
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
-
- }
-}
-
-/**
- *
- */
-public class CancelCompactions extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
- private String tableId;
-
- public CancelCompactions(String tableId) {
- this.tableId = tableId;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- String zCompactID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
- String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
- + Constants.ZTABLE_COMPACT_CANCEL_ID;
-
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
-
- byte[] currentValue = zoo.getData(zCompactID, null);
-
- String cvs = new String(currentValue);
- String[] tokens = cvs.split(",");
- final long flushID = Long.parseLong(new String(tokens[0]));
-
- zoo.mutate(zCancelID, null, null, new Mutator() {
- @Override
- public byte[] mutate(byte[] currentValue) throws Exception {
- long cid = Long.parseLong(new String(currentValue));
-
- if (cid < flushID)
- return (flushID + "").getBytes();
- else
- return (cid + "").getBytes();
-
- }
- });
-
- return new FinishCancelCompaction(tableId);
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- Utils.unreserveTable(tableId, tid, false);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java b/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java
deleted file mode 100644
index 8988135..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.tableOps;
-
-import org.apache.accumulo.core.client.impl.thrift.TableOperation;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.master.state.tables.TableManager;
-import org.apache.log4j.Logger;
-
-public class ChangeTableState extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
- private String tableId;
- private TableOperation top;
-
- public ChangeTableState(String tableId, TableOperation top) {
- this.tableId = tableId;
- this.top = top;
-
- if (top != TableOperation.ONLINE && top != TableOperation.OFFLINE)
- throw new IllegalArgumentException(top.toString());
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- // reserve the table so that this op does not run concurrently with create, clone, or delete table
- return Utils.reserveTable(tableId, tid, true, true, top);
- }
-
- @Override
- public Repo<Master> call(long tid, Master env) throws Exception {
-
- TableState ts = TableState.ONLINE;
- if (top == TableOperation.OFFLINE)
- ts = TableState.OFFLINE;
-
- TableManager.getInstance().transitionTableState(tableId, ts);
- Utils.unreserveTable(tableId, tid, true);
- Logger.getLogger(ChangeTableState.class).debug("Changed table state " + tableId + " " + ts);
- env.getEventCoordinator().event("Set table state of %s to %s", tableId, ts);
- return null;
- }
-
- @Override
- public void undo(long tid, Master env) throws Exception {
- Utils.unreserveTable(tableId, tid, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java b/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java
deleted file mode 100644
index c916d07..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.tableOps;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.TableOperation;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.master.state.tables.TableManager;
-import org.apache.accumulo.server.security.AuditedSecurityOperation;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.log4j.Logger;
-
-class CloneInfo implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- String srcTableId;
- String tableName;
- String tableId;
- Map<String,String> propertiesToSet;
- Set<String> propertiesToExclude;
-
- public String user;
-}
-
-class FinishCloneTable extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
- private CloneInfo cloneInfo;
-
- public FinishCloneTable(CloneInfo cloneInfo) {
- this.cloneInfo = cloneInfo;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- // directories are intentionally not created.... this is done because directories should be unique
- // because they occupy a different namespace than normal tablet directories... also some clones
- // may never create files.. therefore there is no need to consume namenode space w/ directories
- // that are not used... tablet will create directories as needed
-
- TableManager.getInstance().transitionTableState(cloneInfo.tableId, TableState.ONLINE);
-
- Utils.unreserveTable(cloneInfo.srcTableId, tid, false);
- Utils.unreserveTable(cloneInfo.tableId, tid, true);
-
- environment.getEventCoordinator().event("Cloned table %s from %s", cloneInfo.tableName, cloneInfo.srcTableId);
-
- Logger.getLogger(FinishCloneTable.class).debug("Cloned table " + cloneInfo.srcTableId + " " + cloneInfo.tableId + " " + cloneInfo.tableName);
-
- return null;
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {}
-
-}
-
-class CloneMetadata extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
- private CloneInfo cloneInfo;
-
- public CloneMetadata(CloneInfo cloneInfo) {
- this.cloneInfo = cloneInfo;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- Logger.getLogger(CloneMetadata.class).info(
- String.format("Cloning %s with tableId %s from srcTableId %s", cloneInfo.tableName, cloneInfo.tableId, cloneInfo.srcTableId));
- Instance instance = HdfsZooInstance.getInstance();
- // need to clear out any metadata entries for tableId just in case this
- // died before and is executing again
- MetadataTableUtil.deleteTable(cloneInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
- MetadataTableUtil.cloneTable(instance, cloneInfo.srcTableId, cloneInfo.tableId, environment.getFileSystem());
- return new FinishCloneTable(cloneInfo);
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- MetadataTableUtil.deleteTable(cloneInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
- }
-
-}
-
-class CloneZookeeper extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private CloneInfo cloneInfo;
-
- public CloneZookeeper(CloneInfo cloneInfo) {
- this.cloneInfo = cloneInfo;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return Utils.reserveTable(cloneInfo.tableId, tid, true, false, TableOperation.CLONE);
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- Utils.tableNameLock.lock();
- try {
- // write tableName & tableId to zookeeper
- Instance instance = HdfsZooInstance.getInstance();
-
- Utils.checkTableDoesNotExist(instance, cloneInfo.tableName, cloneInfo.tableId, TableOperation.CLONE);
-
- TableManager.getInstance().cloneTable(cloneInfo.srcTableId, cloneInfo.tableId, cloneInfo.tableName, cloneInfo.propertiesToSet,
- cloneInfo.propertiesToExclude, NodeExistsPolicy.OVERWRITE);
- Tables.clearCache(instance);
- return new CloneMetadata(cloneInfo);
- } finally {
- Utils.tableNameLock.unlock();
- }
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- Instance instance = HdfsZooInstance.getInstance();
- TableManager.getInstance().removeTable(cloneInfo.tableId);
- Utils.unreserveTable(cloneInfo.tableId, tid, true);
- Tables.clearCache(instance);
- }
-
-}
-
-class ClonePermissions extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private CloneInfo cloneInfo;
-
- public ClonePermissions(CloneInfo cloneInfo) {
- this.cloneInfo = cloneInfo;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- // give all table permissions to the creator
- for (TablePermission permission : TablePermission.values()) {
- try {
- AuditedSecurityOperation.getInstance().grantTablePermission(SystemCredentials.get().toThrift(environment.getInstance()), cloneInfo.user,
- cloneInfo.tableId, permission);
- } catch (ThriftSecurityException e) {
- Logger.getLogger(FinishCloneTable.class).error(e.getMessage(), e);
- throw e;
- }
- }
-
- // setup permissions in zookeeper before table info in zookeeper
- // this way concurrent users will not get a spurious pemission denied
- // error
- return new CloneZookeeper(cloneInfo);
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(environment.getInstance()), cloneInfo.tableId);
- }
-}
-
-public class CloneTable extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
- private CloneInfo cloneInfo;
-
- public CloneTable(String user, String srcTableId, String tableName, Map<String,String> propertiesToSet, Set<String> propertiesToExclude) {
- cloneInfo = new CloneInfo();
- cloneInfo.user = user;
- cloneInfo.srcTableId = srcTableId;
- cloneInfo.tableName = tableName;
- cloneInfo.propertiesToExclude = propertiesToExclude;
- cloneInfo.propertiesToSet = propertiesToSet;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return Utils.reserveTable(cloneInfo.srcTableId, tid, false, true, TableOperation.CLONE);
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
-
- Utils.idLock.lock();
- try {
- Instance instance = HdfsZooInstance.getInstance();
- cloneInfo.tableId = Utils.getNextTableId(cloneInfo.tableName, instance);
- return new ClonePermissions(cloneInfo);
- } finally {
- Utils.idLock.unlock();
- }
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- Utils.unreserveTable(cloneInfo.srcTableId, tid, false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java b/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
deleted file mode 100644
index e162cc7..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.tableOps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.TableOperation;
-import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.MapCounter;
-import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-
-class CompactionDriver extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private long compactId;
- private String tableId;
- private byte[] startRow;
- private byte[] endRow;
-
- public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) {
-
- this.compactId = compactId;
- this.tableId = tableId;
- this.startRow = startRow;
- this.endRow = endRow;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
-
- String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
- + Constants.ZTABLE_COMPACT_CANCEL_ID;
-
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
-
- if (Long.parseLong(new String(zoo.getData(zCancelID, null))) >= compactId) {
- // compaction was canceled
- throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled");
- }
-
- MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>();
- Connector conn = master.getConnector();
- Scanner scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
-
- Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange();
-
- if (tableId.equals(MetadataTable.ID))
- range = range.clip(new Range(RootTable.EXTENT.getMetadataEntry(), false, null, true));
-
- scanner.setRange(range);
- TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
-
- long t1 = System.currentTimeMillis();
- RowIterator ri = new RowIterator(scanner);
-
- int tabletsToWaitFor = 0;
- int tabletCount = 0;
-
- while (ri.hasNext()) {
- Iterator<Entry<Key,Value>> row = ri.next();
- long tabletCompactID = -1;
-
- TServerInstance server = null;
-
- Entry<Key,Value> entry = null;
- while (row.hasNext()) {
- entry = row.next();
- Key key = entry.getKey();
-
- if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
- tabletCompactID = Long.parseLong(entry.getValue().toString());
-
- if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily()))
- server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
- }
-
- if (tabletCompactID < compactId) {
- tabletsToWaitFor++;
- if (server != null)
- serversToFlush.increment(server, 1);
- }
-
- tabletCount++;
-
- Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
- if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0))
- break;
- }
-
- long scanTime = System.currentTimeMillis() - t1;
-
- Instance instance = master.getInstance();
- Tables.clearCache(instance);
- if (tabletCount == 0 && !Tables.exists(instance, tableId))
- throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
-
- if (serversToFlush.size() == 0 && Tables.getTableState(instance, tableId) == TableState.OFFLINE)
- throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null);
-
- if (tabletsToWaitFor == 0)
- return 0;
-
- for (TServerInstance tsi : serversToFlush.keySet()) {
- try {
- final TServerConnection server = master.getConnection(tsi);
- if (server != null)
- server.compact(master.getMasterLock(), tableId, startRow, endRow);
- } catch (TException ex) {
- Logger.getLogger(CompactionDriver.class).error(ex.toString());
- }
- }
-
- long sleepTime = 500;
-
- if (serversToFlush.size() > 0)
- sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on the server with the most to
- // compact
-
- sleepTime = Math.max(2 * scanTime, sleepTime);
-
- sleepTime = Math.min(sleepTime, 30000);
-
- return sleepTime;
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- CompactRange.removeIterators(tid, tableId);
- Utils.getReadLock(tableId, tid).unlock();
- return null;
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
-
- }
-
-}
-
-public class CompactRange extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
- private String tableId;
- private byte[] startRow;
- private byte[] endRow;
- private byte[] iterators;
-
- public static class CompactionIterators implements Writable {
- byte[] startRow;
- byte[] endRow;
- List<IteratorSetting> iterators;
-
- public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) {
- this.startRow = startRow;
- this.endRow = endRow;
- this.iterators = iterators;
- }
-
- public CompactionIterators() {
- startRow = null;
- endRow = null;
- iterators = Collections.emptyList();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(startRow != null);
- if (startRow != null) {
- out.writeInt(startRow.length);
- out.write(startRow);
- }
-
- out.writeBoolean(endRow != null);
- if (endRow != null) {
- out.writeInt(endRow.length);
- out.write(endRow);
- }
-
- out.writeInt(iterators.size());
- for (IteratorSetting is : iterators) {
- is.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- if (in.readBoolean()) {
- startRow = new byte[in.readInt()];
- in.readFully(startRow);
- } else {
- startRow = null;
- }
-
- if (in.readBoolean()) {
- endRow = new byte[in.readInt()];
- in.readFully(endRow);
- } else {
- endRow = null;
- }
-
- int num = in.readInt();
- iterators = new ArrayList<IteratorSetting>(num);
-
- for (int i = 0; i < num; i++) {
- iterators.add(new IteratorSetting(in));
- }
- }
-
- public Text getEndRow() {
- if (endRow == null)
- return null;
- return new Text(endRow);
- }
-
- public Text getStartRow() {
- if (startRow == null)
- return null;
- return new Text(startRow);
- }
-
- public List<IteratorSetting> getIterators() {
- return iterators;
- }
- }
-
- public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException {
- this.tableId = tableId;
- this.startRow = startRow.length == 0 ? null : startRow;
- this.endRow = endRow.length == 0 ? null : endRow;
-
- if (iterators.size() > 0) {
- this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow, iterators));
- } else {
- iterators = null;
- }
-
- if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
- throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
- "start row must be less than end row");
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT);
- }
-
- @Override
- public Repo<Master> call(final long tid, Master environment) throws Exception {
- String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
-
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
- byte[] cid;
- try {
- cid = zoo.mutate(zTablePath, null, null, new Mutator() {
- @Override
- public byte[] mutate(byte[] currentValue) throws Exception {
- String cvs = new String(currentValue);
- String[] tokens = cvs.split(",");
- long flushID = Long.parseLong(new String(tokens[0]));
- flushID++;
-
- String txidString = String.format("%016x", tid);
-
- for (int i = 1; i < tokens.length; i++) {
- if (tokens[i].startsWith(txidString))
- continue; // skip self
-
- throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
- "Another compaction with iterators is running");
- }
-
- StringBuilder encodedIterators = new StringBuilder();
-
- if (iterators != null) {
- Hex hex = new Hex();
- encodedIterators.append(",");
- encodedIterators.append(txidString);
- encodedIterators.append("=");
- encodedIterators.append(new String(hex.encode(iterators)));
- }
-
- return ("" + flushID + encodedIterators).getBytes();
- }
- });
-
- return new CompactionDriver(Long.parseLong(new String(cid).split(",")[0]), tableId, startRow, endRow);
- } catch (NoNodeException nne) {
- throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
- }
-
- }
-
- static void removeIterators(final long txid, String tableId) throws Exception {
- String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
-
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
-
- zoo.mutate(zTablePath, null, null, new Mutator() {
- @Override
- public byte[] mutate(byte[] currentValue) throws Exception {
- String cvs = new String(currentValue);
- String[] tokens = cvs.split(",");
- long flushID = Long.parseLong(new String(tokens[0]));
-
- String txidString = String.format("%016x", txid);
-
- StringBuilder encodedIterators = new StringBuilder();
- for (int i = 1; i < tokens.length; i++) {
- if (tokens[i].startsWith(txidString))
- continue;
- encodedIterators.append(",");
- encodedIterators.append(tokens[i]);
- }
-
- return ("" + flushID + encodedIterators).getBytes();
- }
- });
-
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- try {
- removeIterators(tid, tableId);
- } finally {
- Utils.unreserveTable(tableId, tid, false);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java b/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
deleted file mode 100644
index 8bd2c3d..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.tableOps;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.admin.TimeType;
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.TableOperation;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.master.state.tables.TableManager;
-import org.apache.accumulo.server.security.AuditedSecurityOperation;
-import org.apache.accumulo.server.security.SecurityOperation;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.util.TablePropUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-class TableInfo implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- String tableName;
- String tableId;
- char timeType;
- String user;
-
- public Map<String,String> props;
-
- public String dir = null;
-}
-
-class FinishCreateTable extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private TableInfo tableInfo;
-
- public FinishCreateTable(TableInfo ti) {
- this.tableInfo = ti;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master env) throws Exception {
- TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
-
- Utils.unreserveTable(tableInfo.tableId, tid, true);
-
- env.getEventCoordinator().event("Created table %s ", tableInfo.tableName);
-
- Logger.getLogger(FinishCreateTable.class).debug("Created table " + tableInfo.tableId + " " + tableInfo.tableName);
-
- return null;
- }
-
- @Override
- public String getReturn() {
- return tableInfo.tableId;
- }
-
- @Override
- public void undo(long tid, Master env) throws Exception {}
-
-}
-
-class PopulateMetadata extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private TableInfo tableInfo;
-
- PopulateMetadata(TableInfo ti) {
- this.tableInfo = ti;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- KeyExtent extent = new KeyExtent(new Text(tableInfo.tableId), null, null);
- MetadataTableUtil.addTablet(extent, tableInfo.dir, SystemCredentials.get(), tableInfo.timeType, environment.getMasterLock());
-
- return new FinishCreateTable(tableInfo);
-
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
- MetadataTableUtil.deleteTable(tableInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
- }
-
-}
-
-class CreateDir extends MasterRepo {
- private static final long serialVersionUID = 1L;
-
- private TableInfo tableInfo;
-
- CreateDir(TableInfo ti) {
- this.tableInfo = ti;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- VolumeManager fs = master.getFileSystem();
- fs.mkdirs(new Path(tableInfo.dir));
- return new PopulateMetadata(tableInfo);
- }
-
- @Override
- public void undo(long tid, Master master) throws Exception {
- VolumeManager fs = master.getFileSystem();
- fs.deleteRecursively(new Path(tableInfo.dir));
-
- }
-}
-
-class ChooseDir extends MasterRepo {
- private static final long serialVersionUID = 1L;
-
- private TableInfo tableInfo;
-
- ChooseDir(TableInfo ti) {
- this.tableInfo = ti;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- // Constants.DEFAULT_TABLET_LOCATION has a leading slash prepended to it so we don't need to add one here
- tableInfo.dir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + tableInfo.tableId + Constants.DEFAULT_TABLET_LOCATION;
- return new CreateDir(tableInfo);
- }
-
- @Override
- public void undo(long tid, Master master) throws Exception {
-
- }
-}
-
-class PopulateZookeeper extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private TableInfo tableInfo;
-
- PopulateZookeeper(TableInfo ti) {
- this.tableInfo = ti;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.CREATE);
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- // reserve the table name in zookeeper or fail
-
- Utils.tableNameLock.lock();
- try {
- // write tableName & tableId to zookeeper
- Instance instance = master.getInstance();
-
- Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE);
-
- TableManager.getInstance().addTable(tableInfo.tableId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE);
-
- for (Entry<String,String> entry : tableInfo.props.entrySet())
- TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue());
-
- Tables.clearCache(instance);
- return new ChooseDir(tableInfo);
- } finally {
- Utils.tableNameLock.unlock();
- }
-
- }
-
- @Override
- public void undo(long tid, Master master) throws Exception {
- Instance instance = master.getInstance();
- TableManager.getInstance().removeTable(tableInfo.tableId);
- Utils.unreserveTable(tableInfo.tableId, tid, true);
- Tables.clearCache(instance);
- }
-
-}
-
-class SetupPermissions extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private TableInfo tableInfo;
-
- public SetupPermissions(TableInfo ti) {
- this.tableInfo = ti;
- }
-
- @Override
- public Repo<Master> call(long tid, Master env) throws Exception {
- // give all table permissions to the creator
- SecurityOperation security = AuditedSecurityOperation.getInstance();
- for (TablePermission permission : TablePermission.values()) {
- try {
- security.grantTablePermission(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.user, tableInfo.tableId, permission);
- } catch (ThriftSecurityException e) {
- Logger.getLogger(FinishCreateTable.class).error(e.getMessage(), e);
- throw e;
- }
- }
-
- // setup permissions in zookeeper before table info in zookeeper
- // this way concurrent users will not get a spurious permission denied
- // error
- return new PopulateZookeeper(tableInfo);
- }
-
- @Override
- public void undo(long tid, Master env) throws Exception {
- AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.tableId);
- }
-
-}
-
-public class CreateTable extends MasterRepo {
- private static final long serialVersionUID = 1L;
-
- private TableInfo tableInfo;
-
- public CreateTable(String user, String tableName, TimeType timeType, Map<String,String> props) {
- tableInfo = new TableInfo();
- tableInfo.tableName = tableName;
- tableInfo.timeType = TabletTime.getTimeID(timeType);
- tableInfo.user = user;
- tableInfo.props = props;
- }
-
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- return 0;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- // first step is to reserve a table id.. if the machine fails during this step
- // it is ok to retry... the only side effect is that a table id may not be used
- // or skipped
-
- // assuming only the master process is creating tables
-
- Utils.idLock.lock();
- try {
- tableInfo.tableId = Utils.getNextTableId(tableInfo.tableName, master.getInstance());
- return new SetupPermissions(tableInfo);
- } finally {
- Utils.idLock.unlock();
- }
-
- }
-
- @Override
- public void undo(long tid, Master env) throws Exception {
- // nothing to do, the table id was allocated!
- }
-
-}