You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:14 UTC
[35/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java
new file mode 100644
index 0000000..8d906d6
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.log4j.Logger;
+
+class CloneInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ String srcTableId;
+ String tableName;
+ String tableId;
+ Map<String,String> propertiesToSet;
+ Set<String> propertiesToExclude;
+
+ public String user;
+}
+
+class FinishCloneTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private CloneInfo cloneInfo;
+
+ public FinishCloneTable(CloneInfo cloneInfo) {
+ this.cloneInfo = cloneInfo;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ // directories are intentionally not created.... this is done because directories should be unique
+ // because they occupy a different namespace than normal tablet directories... also some clones
+ // may never create files.. therefore there is no need to consume namenode space w/ directories
+ // that are not used... tablet will create directories as needed
+
+ TableManager.getInstance().transitionTableState(cloneInfo.tableId, TableState.ONLINE);
+
+ Utils.unreserveTable(cloneInfo.srcTableId, tid, false);
+ Utils.unreserveTable(cloneInfo.tableId, tid, true);
+
+ environment.getEventCoordinator().event("Cloned table %s from %s", cloneInfo.tableName, cloneInfo.srcTableId);
+
+ Logger.getLogger(FinishCloneTable.class).debug("Cloned table " + cloneInfo.srcTableId + " " + cloneInfo.tableId + " " + cloneInfo.tableName);
+
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {}
+
+}
+
+class CloneMetadata extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private CloneInfo cloneInfo;
+
+ public CloneMetadata(CloneInfo cloneInfo) {
+ this.cloneInfo = cloneInfo;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ Logger.getLogger(CloneMetadata.class).info(
+ String.format("Cloning %s with tableId %s from srcTableId %s", cloneInfo.tableName, cloneInfo.tableId, cloneInfo.srcTableId));
+ Instance instance = HdfsZooInstance.getInstance();
+ // need to clear out any metadata entries for tableId just in case this
+ // died before and is executing again
+ MetadataTableUtil.deleteTable(cloneInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
+ MetadataTableUtil.cloneTable(instance, cloneInfo.srcTableId, cloneInfo.tableId, environment.getFileSystem());
+ return new FinishCloneTable(cloneInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ MetadataTableUtil.deleteTable(cloneInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
+ }
+
+}
+
+class CloneZookeeper extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private CloneInfo cloneInfo;
+
+ public CloneZookeeper(CloneInfo cloneInfo) {
+ this.cloneInfo = cloneInfo;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(cloneInfo.tableId, tid, true, false, TableOperation.CLONE);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ Utils.tableNameLock.lock();
+ try {
+ // write tableName & tableId to zookeeper
+ Instance instance = HdfsZooInstance.getInstance();
+
+ Utils.checkTableDoesNotExist(instance, cloneInfo.tableName, cloneInfo.tableId, TableOperation.CLONE);
+
+ TableManager.getInstance().cloneTable(cloneInfo.srcTableId, cloneInfo.tableId, cloneInfo.tableName, cloneInfo.propertiesToSet,
+ cloneInfo.propertiesToExclude, NodeExistsPolicy.OVERWRITE);
+ Tables.clearCache(instance);
+ return new CloneMetadata(cloneInfo);
+ } finally {
+ Utils.tableNameLock.unlock();
+ }
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ Instance instance = HdfsZooInstance.getInstance();
+ TableManager.getInstance().removeTable(cloneInfo.tableId);
+ Utils.unreserveTable(cloneInfo.tableId, tid, true);
+ Tables.clearCache(instance);
+ }
+
+}
+
+class ClonePermissions extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private CloneInfo cloneInfo;
+
+ public ClonePermissions(CloneInfo cloneInfo) {
+ this.cloneInfo = cloneInfo;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ // give all table permissions to the creator
+ for (TablePermission permission : TablePermission.values()) {
+ try {
+ AuditedSecurityOperation.getInstance().grantTablePermission(SystemCredentials.get().toThrift(environment.getInstance()), cloneInfo.user,
+ cloneInfo.tableId, permission);
+ } catch (ThriftSecurityException e) {
+ Logger.getLogger(FinishCloneTable.class).error(e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ // setup permissions in zookeeper before table info in zookeeper
+ // this way concurrent users will not get a spurious pemission denied
+ // error
+ return new CloneZookeeper(cloneInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(environment.getInstance()), cloneInfo.tableId);
+ }
+}
+
+public class CloneTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private CloneInfo cloneInfo;
+
+ public CloneTable(String user, String srcTableId, String tableName, Map<String,String> propertiesToSet, Set<String> propertiesToExclude) {
+ cloneInfo = new CloneInfo();
+ cloneInfo.user = user;
+ cloneInfo.srcTableId = srcTableId;
+ cloneInfo.tableName = tableName;
+ cloneInfo.propertiesToExclude = propertiesToExclude;
+ cloneInfo.propertiesToSet = propertiesToSet;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(cloneInfo.srcTableId, tid, false, true, TableOperation.CLONE);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+
+ Utils.idLock.lock();
+ try {
+ Instance instance = HdfsZooInstance.getInstance();
+ cloneInfo.tableId = Utils.getNextTableId(cloneInfo.tableName, instance);
+ return new ClonePermissions(cloneInfo);
+ } finally {
+ Utils.idLock.unlock();
+ }
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ Utils.unreserveTable(cloneInfo.srcTableId, tid, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
new file mode 100644
index 0000000..160fc7e
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.tableOps.CompactionIterators;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+class CompactionDriver extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private long compactId;
+ private String tableId;
+ private byte[] startRow;
+ private byte[] endRow;
+
+ public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) {
+
+ this.compactId = compactId;
+ this.tableId = tableId;
+ this.startRow = startRow;
+ this.endRow = endRow;
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+
+ String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
+ + Constants.ZTABLE_COMPACT_CANCEL_ID;
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ if (Long.parseLong(new String(zoo.getData(zCancelID, null))) >= compactId) {
+ // compaction was canceled
+ throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled");
+ }
+
+ MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>();
+ Connector conn = master.getConnector();
+ Scanner scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
+
+ Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange();
+
+ if (tableId.equals(MetadataTable.ID))
+ range = range.clip(new Range(RootTable.EXTENT.getMetadataEntry(), false, null, true));
+
+ scanner.setRange(range);
+ TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+
+ long t1 = System.currentTimeMillis();
+ RowIterator ri = new RowIterator(scanner);
+
+ int tabletsToWaitFor = 0;
+ int tabletCount = 0;
+
+ while (ri.hasNext()) {
+ Iterator<Entry<Key,Value>> row = ri.next();
+ long tabletCompactID = -1;
+
+ TServerInstance server = null;
+
+ Entry<Key,Value> entry = null;
+ while (row.hasNext()) {
+ entry = row.next();
+ Key key = entry.getKey();
+
+ if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
+ tabletCompactID = Long.parseLong(entry.getValue().toString());
+
+ if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily()))
+ server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
+ }
+
+ if (tabletCompactID < compactId) {
+ tabletsToWaitFor++;
+ if (server != null)
+ serversToFlush.increment(server, 1);
+ }
+
+ tabletCount++;
+
+ Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
+ if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0))
+ break;
+ }
+
+ long scanTime = System.currentTimeMillis() - t1;
+
+ Instance instance = master.getInstance();
+ Tables.clearCache(instance);
+ if (tabletCount == 0 && !Tables.exists(instance, tableId))
+ throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
+
+ if (serversToFlush.size() == 0 && Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+ throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null);
+
+ if (tabletsToWaitFor == 0)
+ return 0;
+
+ for (TServerInstance tsi : serversToFlush.keySet()) {
+ try {
+ final TServerConnection server = master.getConnection(tsi);
+ if (server != null)
+ server.compact(master.getMasterLock(), tableId, startRow, endRow);
+ } catch (TException ex) {
+ Logger.getLogger(CompactionDriver.class).error(ex.toString());
+ }
+ }
+
+ long sleepTime = 500;
+
+ if (serversToFlush.size() > 0)
+ sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on the server with the most to
+ // compact
+
+ sleepTime = Math.max(2 * scanTime, sleepTime);
+
+ sleepTime = Math.min(sleepTime, 30000);
+
+ return sleepTime;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ CompactRange.removeIterators(tid, tableId);
+ Utils.getReadLock(tableId, tid).unlock();
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+
+ }
+
+}
+
+public class CompactRange extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private String tableId;
+ private byte[] startRow;
+ private byte[] endRow;
+ private byte[] iterators;
+
+ public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException {
+ this.tableId = tableId;
+ this.startRow = startRow.length == 0 ? null : startRow;
+ this.endRow = endRow.length == 0 ? null : endRow;
+
+ if (iterators.size() > 0) {
+ this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow, iterators));
+ } else {
+ iterators = null;
+ }
+
+ if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
+ throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
+ "start row must be less than end row");
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT);
+ }
+
+ @Override
+ public Repo<Master> call(final long tid, Master environment) throws Exception {
+ String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ byte[] cid;
+ try {
+ cid = zoo.mutate(zTablePath, null, null, new Mutator() {
+ @Override
+ public byte[] mutate(byte[] currentValue) throws Exception {
+ String cvs = new String(currentValue);
+ String[] tokens = cvs.split(",");
+ long flushID = Long.parseLong(new String(tokens[0]));
+ flushID++;
+
+ String txidString = String.format("%016x", tid);
+
+ for (int i = 1; i < tokens.length; i++) {
+ if (tokens[i].startsWith(txidString))
+ continue; // skip self
+
+ throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
+ "Another compaction with iterators is running");
+ }
+
+ StringBuilder encodedIterators = new StringBuilder();
+
+ if (iterators != null) {
+ Hex hex = new Hex();
+ encodedIterators.append(",");
+ encodedIterators.append(txidString);
+ encodedIterators.append("=");
+ encodedIterators.append(new String(hex.encode(iterators)));
+ }
+
+ return ("" + flushID + encodedIterators).getBytes();
+ }
+ });
+
+ return new CompactionDriver(Long.parseLong(new String(cid).split(",")[0]), tableId, startRow, endRow);
+ } catch (NoNodeException nne) {
+ throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
+ }
+
+ }
+
+ static void removeIterators(final long txid, String tableId) throws Exception {
+ String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ zoo.mutate(zTablePath, null, null, new Mutator() {
+ @Override
+ public byte[] mutate(byte[] currentValue) throws Exception {
+ String cvs = new String(currentValue);
+ String[] tokens = cvs.split(",");
+ long flushID = Long.parseLong(new String(tokens[0]));
+
+ String txidString = String.format("%016x", txid);
+
+ StringBuilder encodedIterators = new StringBuilder();
+ for (int i = 1; i < tokens.length; i++) {
+ if (tokens[i].startsWith(txidString))
+ continue;
+ encodedIterators.append(",");
+ encodedIterators.append(tokens[i]);
+ }
+
+ return ("" + flushID + encodedIterators).getBytes();
+ }
+ });
+
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ try {
+ removeIterators(tid, tableId);
+ } finally {
+ Utils.unreserveTable(tableId, tid, false);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java
new file mode 100644
index 0000000..df2e028
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.TablePropUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+class TableInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ String tableName;
+ String tableId;
+ char timeType;
+ String user;
+
+ public Map<String,String> props;
+
+ public String dir = null;
+}
+
+class FinishCreateTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ public FinishCreateTable(TableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+ TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
+
+ Utils.unreserveTable(tableInfo.tableId, tid, true);
+
+ env.getEventCoordinator().event("Created table %s ", tableInfo.tableName);
+
+ Logger.getLogger(FinishCreateTable.class).debug("Created table " + tableInfo.tableId + " " + tableInfo.tableName);
+
+ return null;
+ }
+
+ @Override
+ public String getReturn() {
+ return tableInfo.tableId;
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {}
+
+}
+
+class PopulateMetadata extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ PopulateMetadata(TableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ KeyExtent extent = new KeyExtent(new Text(tableInfo.tableId), null, null);
+ MetadataTableUtil.addTablet(extent, tableInfo.dir, SystemCredentials.get(), tableInfo.timeType, environment.getMasterLock());
+
+ return new FinishCreateTable(tableInfo);
+
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ MetadataTableUtil.deleteTable(tableInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
+ }
+
+}
+
+class CreateDir extends MasterRepo {
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ CreateDir(TableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ VolumeManager fs = master.getFileSystem();
+ fs.mkdirs(new Path(tableInfo.dir));
+ return new PopulateMetadata(tableInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master master) throws Exception {
+ VolumeManager fs = master.getFileSystem();
+ fs.deleteRecursively(new Path(tableInfo.dir));
+
+ }
+}
+
+class ChooseDir extends MasterRepo {
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ ChooseDir(TableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ // Constants.DEFAULT_TABLET_LOCATION has a leading slash prepended to it so we don't need to add one here
+ tableInfo.dir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + tableInfo.tableId + Constants.DEFAULT_TABLET_LOCATION;
+ return new CreateDir(tableInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master master) throws Exception {
+
+ }
+}
+
+class PopulateZookeeper extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ PopulateZookeeper(TableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.CREATE);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ // reserve the table name in zookeeper or fail
+
+ Utils.tableNameLock.lock();
+ try {
+ // write tableName & tableId to zookeeper
+ Instance instance = master.getInstance();
+
+ Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE);
+
+ TableManager.getInstance().addTable(tableInfo.tableId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE);
+
+ for (Entry<String,String> entry : tableInfo.props.entrySet())
+ TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue());
+
+ Tables.clearCache(instance);
+ return new ChooseDir(tableInfo);
+ } finally {
+ Utils.tableNameLock.unlock();
+ }
+
+ }
+
+ @Override
+ public void undo(long tid, Master master) throws Exception {
+ Instance instance = master.getInstance();
+ TableManager.getInstance().removeTable(tableInfo.tableId);
+ Utils.unreserveTable(tableInfo.tableId, tid, true);
+ Tables.clearCache(instance);
+ }
+
+}
+
+class SetupPermissions extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ public SetupPermissions(TableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+ // give all table permissions to the creator
+ SecurityOperation security = AuditedSecurityOperation.getInstance();
+ for (TablePermission permission : TablePermission.values()) {
+ try {
+ security.grantTablePermission(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.user, tableInfo.tableId, permission);
+ } catch (ThriftSecurityException e) {
+ Logger.getLogger(FinishCreateTable.class).error(e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ // setup permissions in zookeeper before table info in zookeeper
+ // this way concurrent users will not get a spurious permission denied
+ // error
+ return new PopulateZookeeper(tableInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.tableId);
+ }
+
+}
+
+public class CreateTable extends MasterRepo {
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ public CreateTable(String user, String tableName, TimeType timeType, Map<String,String> props) {
+ tableInfo = new TableInfo();
+ tableInfo.tableName = tableName;
+ tableInfo.timeType = TabletTime.getTimeID(timeType);
+ tableInfo.user = user;
+ tableInfo.props = props;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ // first step is to reserve a table id.. if the machine fails during this step
+ // it is ok to retry... the only side effect is that a table id may not be used
+ // or skipped
+
+ // assuming only the master process is creating tables
+
+ Utils.idLock.lock();
+ try {
+ tableInfo.tableId = Utils.getNextTableId(tableInfo.tableName, master.getInstance());
+ return new SetupPermissions(tableInfo);
+ } finally {
+ Utils.idLock.unlock();
+ }
+
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ // nothing to do, the table id was allocated!
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
new file mode 100644
index 0000000..4271257
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.GrepIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+class CleanUp extends MasterRepo {
+
+ final private static Logger log = Logger.getLogger(CleanUp.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId;
+
+ private long creationTime;
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ /*
+ * handle the case where we start executing on a new machine where the current time is in the past relative to the previous machine
+ *
+ * if the new machine has time in the future, that will work ok w/ hasCycled
+ */
+ if (System.currentTimeMillis() < creationTime) {
+ creationTime = System.currentTimeMillis();
+ }
+
+ }
+
+ public CleanUp(String tableId) {
+ this.tableId = tableId;
+ creationTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+ if (!master.hasCycled(creationTime)) {
+ return 50;
+ }
+
+ boolean done = true;
+ Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
+ Scanner scanner = master.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ MetaDataTableScanner.configureScanner(scanner, master);
+ scanner.setRange(tableRange);
+
+ KeyExtent prevExtent = null;
+ for (Entry<Key,Value> entry : scanner) {
+ TabletLocationState locationState = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
+ if (!locationState.extent.isPreviousExtent(prevExtent)) {
+ log.debug("Still waiting for table to be deleted: " + tableId + " saw inconsistency" + prevExtent + " " + locationState.extent);
+ done = false;
+ break;
+ }
+ prevExtent = locationState.extent;
+
+ TabletState state = locationState.getState(master.onlineTabletServers());
+ if (state.equals(TabletState.ASSIGNED) || state.equals(TabletState.HOSTED)) {
+ log.debug("Still waiting for table to be deleted: " + tableId + " locationState: " + locationState);
+ done = false;
+ break;
+ }
+ }
+
+ if (!done)
+ return 50;
+
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+
+ master.clearMigrations(tableId);
+
+ int refCount = 0;
+
+ try {
+ // look for other tables that references this table's files
+ Connector conn = master.getConnector();
+ BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 8);
+ try {
+ Range allTables = MetadataSchema.TabletsSection.getRange();
+ Range tableRange = MetadataSchema.TabletsSection.getRange(tableId);
+ Range beforeTable = new Range(allTables.getStartKey(), true, tableRange.getStartKey(), false);
+ Range afterTable = new Range(tableRange.getEndKey(), false, allTables.getEndKey(), true);
+ bs.setRanges(Arrays.asList(beforeTable, afterTable));
+ bs.fetchColumnFamily(DataFileColumnFamily.NAME);
+ IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class);
+ GrepIterator.setTerm(cfg, "/" + tableId + "/");
+ bs.addScanIterator(cfg);
+
+ for (Entry<Key,Value> entry : bs) {
+ if (entry.getKey().getColumnQualifier().toString().contains("/" + tableId + "/")) {
+ refCount++;
+ }
+ }
+ } finally {
+ bs.close();
+ }
+
+ } catch (Exception e) {
+ refCount = -1;
+ log.error("Failed to scan " + MetadataTable.NAME + " looking for references to deleted table " + tableId, e);
+ }
+
+ // remove metadata table entries
+ try {
+ // Intentionally do not pass master lock. If master loses lock, this operation may complete before master can kill itself.
+ // If the master lock passed to deleteTable, it is possible that the delete mutations will be dropped. If the delete operations
+ // are dropped and the operation completes, then the deletes will not be repeated.
+ MetadataTableUtil.deleteTable(tableId, refCount != 0, SystemCredentials.get(), null);
+ } catch (Exception e) {
+ log.error("error deleting " + tableId + " from metadata table", e);
+ }
+
+ // remove any problem reports the table may have
+ try {
+ ProblemReports.getInstance().deleteProblemReports(tableId);
+ } catch (Exception e) {
+ log.error("Failed to delete problem reports for table " + tableId, e);
+ }
+
+ if (refCount == 0) {
+ // delete the map files
+ try {
+ VolumeManager fs = master.getFileSystem();
+ for (String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir, tableId));
+ }
+ } catch (IOException e) {
+ log.error("Unable to remove deleted table directory", e);
+ }
+ }
+
+ // remove table from zookeeper
+ try {
+ TableManager.getInstance().removeTable(tableId);
+ Tables.clearCache(master.getInstance());
+ } catch (Exception e) {
+ log.error("Failed to find table id in zookeeper", e);
+ }
+
+ // remove any permissions associated with this table
+ try {
+ AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(master.getInstance()), tableId);
+ } catch (ThriftSecurityException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ Utils.unreserveTable(tableId, tid, true);
+
+ Logger.getLogger(CleanUp.class).debug("Deleted table " + tableId);
+
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ // nothing to do
+ }
+
+}
+
+public class DeleteTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId;
+
+ public DeleteTable(String tableId) {
+ this.tableId = tableId;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
+ environment.getEventCoordinator().event("deleting table %s ", tableId);
+ return new CleanUp(tableId);
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ Utils.unreserveTable(tableId, tid, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java
new file mode 100644
index 0000000..22df3b3
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+class ExportInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public String tableName;
+ public String tableID;
+ public String exportDir;
+}
+
+class WriteExportFiles extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private final ExportInfo tableInfo;
+
+ WriteExportFiles(ExportInfo tableInfo) {
+ this.tableInfo = tableInfo;
+ }
+
+ private void checkOffline(Connector conn) throws Exception {
+ if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) {
+ Tables.clearCache(conn.getInstance());
+ if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) {
+ throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
+ "Table is not offline");
+ }
+ }
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+
+ long reserved = Utils.reserveTable(tableInfo.tableID, tid, false, true, TableOperation.EXPORT);
+ if (reserved > 0)
+ return reserved;
+
+ Connector conn = master.getConnector();
+
+ checkOffline(conn);
+
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.setRange(new KeyExtent(new Text(tableInfo.tableID), null, null).toMetadataRange());
+
+ // scan for locations
+ metaScanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ metaScanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
+
+ if (metaScanner.iterator().hasNext()) {
+ return 500;
+ }
+
+ // use the same range to check for walogs that we used to check for hosted (or future hosted) tablets
+ // this is done as a separate scan after we check for locations, because walogs are okay only if there is no location
+ metaScanner.clearColumns();
+ metaScanner.fetchColumnFamily(LogColumnFamily.NAME);
+
+ if (metaScanner.iterator().hasNext()) {
+ throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
+ "Write ahead logs found for table");
+ }
+
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ Connector conn = master.getConnector();
+
+ try {
+ exportTable(master.getFileSystem(), conn, tableInfo.tableName, tableInfo.tableID, tableInfo.exportDir);
+ } catch (IOException ioe) {
+ throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
+ "Failed to create export files " + ioe.getMessage());
+ }
+ Utils.unreserveTable(tableInfo.tableID, tid, false);
+ Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ Utils.unreserveTable(tableInfo.tableID, tid, false);
+ }
+
+ public static void exportTable(VolumeManager fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
+
+ fs.mkdirs(new Path(exportDir));
+ Path exportMetaFilePath = fs.getFileSystemByPath(new Path(exportDir)).makeQualified(new Path(exportDir, Constants.EXPORT_FILE));
+
+ FSDataOutputStream fileOut = fs.create(exportMetaFilePath, false);
+ ZipOutputStream zipOut = new ZipOutputStream(fileOut);
+ BufferedOutputStream bufOut = new BufferedOutputStream(zipOut);
+ DataOutputStream dataOut = new DataOutputStream(bufOut);
+
+ try {
+
+ zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_INFO_FILE));
+ OutputStreamWriter osw = new OutputStreamWriter(dataOut);
+ osw.append(ExportTable.EXPORT_VERSION_PROP + ":" + ExportTable.VERSION + "\n");
+ osw.append("srcInstanceName:" + conn.getInstance().getInstanceName() + "\n");
+ osw.append("srcInstanceID:" + conn.getInstance().getInstanceID() + "\n");
+ osw.append("srcZookeepers:" + conn.getInstance().getZooKeepers() + "\n");
+ osw.append("srcTableName:" + tableName + "\n");
+ osw.append("srcTableID:" + tableID + "\n");
+ osw.append(ExportTable.DATA_VERSION_PROP + ":" + ServerConstants.DATA_VERSION + "\n");
+ osw.append("srcCodeVersion:" + Constants.VERSION + "\n");
+
+ osw.flush();
+ dataOut.flush();
+
+ exportConfig(conn, tableID, zipOut, dataOut);
+ dataOut.flush();
+
+ Map<String,String> uniqueFiles = exportMetadata(fs, conn, tableID, zipOut, dataOut);
+
+ dataOut.close();
+ dataOut = null;
+
+ createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles);
+
+ } finally {
+ if (dataOut != null)
+ dataOut.close();
+ }
+ }
+
+ private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
+ BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false)));
+
+ try {
+ for (String file : uniqueFiles.values()) {
+ distcpOut.append(file);
+ distcpOut.newLine();
+ }
+
+ distcpOut.append(exportMetaFilePath.toString());
+ distcpOut.newLine();
+
+ distcpOut.close();
+ distcpOut = null;
+
+ } finally {
+ if (distcpOut != null)
+ distcpOut.close();
+ }
+ }
+
+ private static Map<String,String> exportMetadata(VolumeManager fs, Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut)
+ throws IOException, TableNotFoundException {
+ zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
+
+ Map<String,String> uniqueFiles = new HashMap<String,String>();
+
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(metaScanner);
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(metaScanner);
+ metaScanner.setRange(new KeyExtent(new Text(tableID), null, null).toMetadataRange());
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ entry.getKey().write(dataOut);
+ entry.getValue().write(dataOut);
+
+ if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ String path = fs.getFullPath(entry.getKey()).toString();
+ String tokens[] = path.split("/");
+ if (tokens.length < 1) {
+ throw new RuntimeException("Illegal path " + path);
+ }
+
+ String filename = tokens[tokens.length - 1];
+
+ String existingPath = uniqueFiles.get(filename);
+ if (existingPath == null) {
+ uniqueFiles.put(filename, path);
+ } else if (!existingPath.equals(path)) {
+ // make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier
+ throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table.");
+ }
+
+ }
+ }
+ return uniqueFiles;
+ }
+
+ private static void exportConfig(Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws AccumuloException,
+ AccumuloSecurityException, TableNotFoundException, IOException {
+
+ DefaultConfiguration defaultConfig = AccumuloConfiguration.getDefaultConfiguration();
+ Map<String,String> siteConfig = conn.instanceOperations().getSiteConfiguration();
+ Map<String,String> systemConfig = conn.instanceOperations().getSystemConfiguration();
+
+ TableConfiguration tableConfig = ServerConfiguration.getTableConfiguration(conn.getInstance(), tableID);
+
+ OutputStreamWriter osw = new OutputStreamWriter(dataOut);
+
+ // only put props that are different than defaults and higher level configurations
+ zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_TABLE_CONFIG_FILE));
+ for (Entry<String,String> prop : tableConfig) {
+ if (prop.getKey().startsWith(Property.TABLE_PREFIX.getKey())) {
+ Property key = Property.getPropertyByKey(prop.getKey());
+
+ if (key == null || !defaultConfig.get(key).equals(prop.getValue())) {
+ if (!prop.getValue().equals(siteConfig.get(prop.getKey())) && !prop.getValue().equals(systemConfig.get(prop.getKey()))) {
+ osw.append(prop.getKey() + "=" + prop.getValue() + "\n");
+ }
+ }
+ }
+ }
+
+ osw.flush();
+ }
+}
+
+public class ExportTable extends MasterRepo {
+ private static final long serialVersionUID = 1L;
+
+ private final ExportInfo tableInfo;
+
+ public ExportTable(String tableName, String tableId, String exportDir) {
+ tableInfo = new ExportInfo();
+ tableInfo.tableName = tableName;
+ tableInfo.exportDir = exportDir;
+ tableInfo.tableID = tableId;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+ return new WriteExportFiles(tableInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+ }
+
+ public static final int VERSION = 1;
+
+ public static final String DATA_VERSION_PROP = "srcDataVersion";
+ public static final String EXPORT_VERSION_PROP = "exportVersion";
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java
new file mode 100644
index 0000000..b91da52
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.admin.TableOperationsImpl;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tablets.UniqueNameAllocator;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.TablePropUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+class ImportedTableInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public String exportDir;
+ public String user;
+ public String tableName;
+ public String tableId;
+ public String importDir;
+}
+
+class FinishImportTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ public FinishImportTable(ImportedTableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt"));
+
+ TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
+
+ Utils.unreserveTable(tableInfo.tableId, tid, true);
+
+ Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+
+ env.getEventCoordinator().event("Imported table %s ", tableInfo.tableName);
+
+ Logger.getLogger(FinishImportTable.class).debug("Imported table " + tableInfo.tableId + " " + tableInfo.tableName);
+
+ return null;
+ }
+
+ @Override
+ public String getReturn() {
+ return tableInfo.tableId;
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {}
+
+}
+
+class MoveExportedFiles extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ MoveExportedFiles(ImportedTableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ try {
+ VolumeManager fs = master.getFileSystem();
+
+ Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo);
+
+ for (String oldFileName : fileNameMappings.keySet()) {
+ if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) {
+ throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "File referenced by exported table does not exists " + oldFileName);
+ }
+ }
+
+ FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
+
+ for (FileStatus fileStatus : files) {
+ String newName = fileNameMappings.get(fileStatus.getPath().getName());
+
+ if (newName != null)
+ fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, newName));
+ }
+
+ return new FinishImportTable(tableInfo);
+ } catch (IOException ioe) {
+ log.warn(ioe.getMessage(), ioe);
+ throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Error renaming files " + ioe.getMessage());
+ }
+ }
+}
+
+class PopulateMetadataTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ PopulateMetadataTable(ImportedTableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ static Map<String,String> readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception {
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt"))));
+
+ try {
+ Map<String,String> map = new HashMap<String,String>();
+
+ String line = null;
+ while ((line = in.readLine()) != null) {
+ String sa[] = line.split(":", 2);
+ map.put(sa[0], sa[1]);
+ }
+
+ return map;
+ } finally {
+ in.close();
+ }
+
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+
+ Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
+
+ BatchWriter mbw = null;
+ ZipInputStream zis = null;
+
+ try {
+ VolumeManager fs = master.getFileSystem();
+
+ mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ zis = new ZipInputStream(fs.open(path));
+
+ Map<String,String> fileNameMappings = readMappingFile(fs, tableInfo);
+
+ String bulkDir = new Path(tableInfo.importDir).getName();
+
+ ZipEntry zipEntry;
+ while ((zipEntry = zis.getNextEntry()) != null) {
+ if (zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) {
+ DataInputStream in = new DataInputStream(new BufferedInputStream(zis));
+
+ Key key = new Key();
+ Value val = new Value();
+
+ Mutation m = null;
+ Text currentRow = null;
+ int dirCount = 0;
+
+ while (true) {
+ key.readFields(in);
+ val.readFields(in);
+
+ Text endRow = new KeyExtent(key.getRow(), (Text) null).getEndRow();
+ Text metadataRow = new KeyExtent(new Text(tableInfo.tableId), endRow, null).getMetadataEntry();
+
+ Text cq;
+
+ if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ String oldName = new Path(key.getColumnQualifier().toString()).getName();
+ String newName = fileNameMappings.get(oldName);
+
+ if (newName == null) {
+ throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "File " + oldName + " does not exist in import dir");
+ }
+
+ cq = new Text("/" + bulkDir + "/" + newName);
+ } else {
+ cq = key.getColumnQualifier();
+ }
+
+ if (m == null) {
+ m = new Mutation(metadataRow);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++, 8, 16, "/c-".getBytes())));
+ currentRow = metadataRow;
+ }
+
+ if (!currentRow.equals(metadataRow)) {
+ mbw.addMutation(m);
+ m = new Mutation(metadataRow);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++, 8, 16, "/c-".getBytes())));
+ }
+
+ m.put(key.getColumnFamily(), cq, val);
+
+ if (endRow == null && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
+ mbw.addMutation(m);
+ break; // its the last column in the last row
+ }
+ }
+
+ break;
+ }
+ }
+
+ return new MoveExportedFiles(tableInfo);
+ } catch (IOException ioe) {
+ log.warn(ioe.getMessage(), ioe);
+ throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Error reading " + path + " " + ioe.getMessage());
+ } finally {
+ if (zis != null) {
+ try {
+ zis.close();
+ } catch (IOException ioe) {
+ log.warn("Failed to close zip file ", ioe);
+ }
+ }
+
+ if (mbw != null) {
+ mbw.close();
+ }
+ }
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ MetadataTableUtil.deleteTable(tableInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
+ }
+}
+
+class MapImportFileNames extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ MapImportFileNames(ImportedTableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+
+ Path path = new Path(tableInfo.importDir, "mappings.txt");
+
+ BufferedWriter mappingsWriter = null;
+
+ try {
+ VolumeManager fs = environment.getFileSystem();
+
+ fs.mkdirs(new Path(tableInfo.importDir));
+
+ FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
+
+ UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+
+ mappingsWriter = new BufferedWriter(new OutputStreamWriter(fs.create(path)));
+
+ for (FileStatus fileStatus : files) {
+ String fileName = fileStatus.getPath().getName();
+ log.info("filename " + fileStatus.getPath().toString());
+ String sa[] = fileName.split("\\.");
+ String extension = "";
+ if (sa.length > 1) {
+ extension = sa[sa.length - 1];
+
+ if (!FileOperations.getValidExtensions().contains(extension)) {
+ continue;
+ }
+ } else {
+ // assume it is a map file
+ extension = Constants.MAPFILE_EXTENSION;
+ }
+
+ String newName = "I" + namer.getNextName() + "." + extension;
+
+ mappingsWriter.append(fileName);
+ mappingsWriter.append(':');
+ mappingsWriter.append(newName);
+ mappingsWriter.newLine();
+ }
+
+ mappingsWriter.close();
+ mappingsWriter = null;
+
+ return new PopulateMetadataTable(tableInfo);
+ } catch (IOException ioe) {
+ log.warn(ioe.getMessage(), ioe);
+ throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Error writing mapping file " + path + " " + ioe.getMessage());
+ } finally {
+ if (mappingsWriter != null)
+ try {
+ mappingsWriter.close();
+ } catch (IOException ioe) {
+ log.warn("Failed to close " + path, ioe);
+ }
+ }
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir));
+ }
+}
+
+class CreateImportDir extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ CreateImportDir(ImportedTableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+
+ UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+
+ Path base = master.getFileSystem().matchingFileSystem(new Path(tableInfo.exportDir), ServerConstants.getTablesDirs());
+ Path directory = new Path(base, tableInfo.tableId);
+
+ Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
+
+ tableInfo.importDir = newBulkDir.toString();
+
+ return new MapImportFileNames(tableInfo);
+ }
+}
+
+class ImportPopulateZookeeper extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ ImportPopulateZookeeper(ImportedTableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT);
+ }
+
+ private Map<String,String> getExportedProps(VolumeManager fs) throws Exception {
+
+ Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
+
+ try {
+ FileSystem ns = fs.getFileSystemByPath(path);
+ return TableOperationsImpl.getExportedProps(ns, path);
+ } catch (IOException ioe) {
+ throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Error reading table props from " + path + " " + ioe.getMessage());
+ }
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+ // reserve the table name in zookeeper or fail
+
+ Utils.tableNameLock.lock();
+ try {
+ // write tableName & tableId to zookeeper
+ Instance instance = HdfsZooInstance.getInstance();
+
+ Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE);
+
+ TableManager.getInstance().addTable(tableInfo.tableId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE);
+
+ Tables.clearCache(instance);
+ } finally {
+ Utils.tableNameLock.unlock();
+ }
+
+ for (Entry<String,String> entry : getExportedProps(env.getFileSystem()).entrySet())
+ if (!TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue())) {
+ throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Invalid table property " + entry.getKey());
+ }
+
+ return new CreateImportDir(tableInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ Instance instance = HdfsZooInstance.getInstance();
+ TableManager.getInstance().removeTable(tableInfo.tableId);
+ Utils.unreserveTable(tableInfo.tableId, tid, true);
+ Tables.clearCache(instance);
+ }
+}
+
+class ImportSetupPermissions extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ public ImportSetupPermissions(ImportedTableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+ // give all table permissions to the creator
+ SecurityOperation security = AuditedSecurityOperation.getInstance();
+ for (TablePermission permission : TablePermission.values()) {
+ try {
+ security.grantTablePermission(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.user, tableInfo.tableId, permission);
+ } catch (ThriftSecurityException e) {
+ Logger.getLogger(ImportSetupPermissions.class).error(e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ // setup permissions in zookeeper before table info in zookeeper
+ // this way concurrent users will not get a spurious permission denied
+ // error
+ return new ImportPopulateZookeeper(tableInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.tableId);
+ }
+}
+
+public class ImportTable extends MasterRepo {
+ private static final long serialVersionUID = 1L;
+
+ private ImportedTableInfo tableInfo;
+
+ public ImportTable(String user, String tableName, String exportDir) {
+ tableInfo = new ImportedTableInfo();
+ tableInfo.tableName = tableName;
+ tableInfo.user = user;
+ tableInfo.exportDir = exportDir;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master env) throws Exception {
+ checkVersions(env);
+
+ // first step is to reserve a table id.. if the machine fails during this step
+ // it is ok to retry... the only side effect is that a table id may not be used
+ // or skipped
+
+ // assuming only the master process is creating tables
+
+ Utils.idLock.lock();
+ try {
+ Instance instance = HdfsZooInstance.getInstance();
+ tableInfo.tableId = Utils.getNextTableId(tableInfo.tableName, instance);
+ return new ImportSetupPermissions(tableInfo);
+ } finally {
+ Utils.idLock.unlock();
+ }
+ }
+
+ public void checkVersions(Master env) throws ThriftTableOperationException {
+ Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
+
+ ZipInputStream zis = null;
+
+ try {
+ zis = new ZipInputStream(env.getFileSystem().open(path));
+
+ Integer exportVersion = null;
+ Integer dataVersion = null;
+
+ ZipEntry zipEntry;
+ while ((zipEntry = zis.getNextEntry()) != null) {
+ if (zipEntry.getName().equals(Constants.EXPORT_INFO_FILE)) {
+ BufferedReader in = new BufferedReader(new InputStreamReader(zis));
+ String line = null;
+ while ((line = in.readLine()) != null) {
+ String sa[] = line.split(":", 2);
+ if (sa[0].equals(ExportTable.EXPORT_VERSION_PROP)) {
+ exportVersion = Integer.parseInt(sa[1]);
+ } else if (sa[0].equals(ExportTable.DATA_VERSION_PROP)) {
+ dataVersion = Integer.parseInt(sa[1]);
+ }
+ }
+
+ break;
+ }
+ }
+
+ zis.close();
+ zis = null;
+
+ if (exportVersion == null || exportVersion > ExportTable.VERSION)
+ throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Incompatible export version " + exportVersion);
+
+ if (dataVersion == null || dataVersion > ServerConstants.DATA_VERSION)
+ throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Incompatible data version " + exportVersion);
+
+ } catch (IOException ioe) {
+ log.warn(ioe.getMessage(), ioe);
+ throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+ "Failed to read export metadata " + ioe.getMessage());
+ } finally {
+ if (zis != null)
+ try {
+ zis.close();
+ } catch (IOException ioe) {
+ log.warn(ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/MasterRepo.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/MasterRepo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MasterRepo.java
new file mode 100644
index 0000000..dfd287c
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MasterRepo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.log4j.Logger;
+
+public abstract class MasterRepo implements Repo<Master> {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Logger log = Logger.getLogger(MasterRepo.class);
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {}
+
+ @Override
+ public String getDescription() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getReturn() {
+ return null;
+ }
+
+ @Override
+ abstract public Repo<Master> call(long tid, Master environment) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
new file mode 100644
index 0000000..3069aaf
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+
+public class RenameTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+ private String tableId;
+ private String oldTableName;
+ private String newTableName;
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(tableId, tid, true, true, TableOperation.RENAME);
+ }
+
+ public RenameTable(String tableId, String oldTableName, String newTableName) {
+ this.tableId = tableId;
+ this.oldTableName = oldTableName;
+ this.newTableName = newTableName;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+
+ Instance instance = master.getInstance();
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ Utils.tableNameLock.lock();
+ try {
+ Utils.checkTableDoesNotExist(instance, newTableName, tableId, TableOperation.RENAME);
+
+ final String tap = ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAME;
+
+ zoo.mutate(tap, null, null, new Mutator() {
+ public byte[] mutate(byte[] current) throws Exception {
+ final String currentName = new String(current);
+ if (currentName.equals(newTableName))
+ return null; // assume in this case the operation is running again, so we are done
+ if (!currentName.equals(oldTableName)) {
+ throw new ThriftTableOperationException(null, oldTableName, TableOperation.RENAME, TableOperationExceptionType.NOTFOUND,
+ "Name changed while processing");
+ }
+ return newTableName.getBytes();
+ }
+ });
+ Tables.clearCache(instance);
+ } finally {
+ Utils.tableNameLock.unlock();
+ Utils.unreserveTable(tableId, tid, true);
+ }
+
+ Logger.getLogger(RenameTable.class).debug("Renamed table " + tableId + " " + oldTableName + " " + newTableName);
+
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master env) throws Exception {
+ Utils.unreserveTable(tableId, tid, true);
+ }
+
+}