You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:28 UTC
[49/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
new file mode 100644
index 0000000..18381c7
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance.AccumuloNotInitializedException;
+import org.apache.log4j.Logger;
+
+public class ZooConfiguration extends AccumuloConfiguration {
+ private static final Logger log = Logger.getLogger(ZooConfiguration.class);
+
+ private final AccumuloConfiguration parent;
+ private static ZooConfiguration instance = null;
+ private static String instanceId = null;
+ private static ZooCache propCache = null;
+ private final Map<String,String> fixedProps = Collections.synchronizedMap(new HashMap<String,String>());
+
+ private ZooConfiguration(AccumuloConfiguration parent) {
+ this.parent = parent;
+ }
+
+ synchronized public static ZooConfiguration getInstance(Instance inst, AccumuloConfiguration parent) {
+ if (instance == null) {
+ propCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+ instance = new ZooConfiguration(parent);
+ instanceId = inst.getInstanceID();
+ }
+ return instance;
+ }
+
+ synchronized public static ZooConfiguration getInstance(AccumuloConfiguration parent) {
+ if (instance == null) {
+ propCache = new ZooCache(parent.get(Property.INSTANCE_ZK_HOST), (int) parent.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ instance = new ZooConfiguration(parent);
+ String deprecatedInstanceIdFromHdfs = ZooUtil.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation());
+ instanceId = deprecatedInstanceIdFromHdfs;
+ }
+ return instance;
+ }
+
+ @Override
+ public void invalidateCache() {
+ if (propCache != null)
+ propCache.clear();
+ }
+
+ private String _get(Property property) {
+ String key = property.getKey();
+ String value = null;
+
+ if (Property.isValidZooPropertyKey(key)) {
+ try {
+ value = get(key);
+ } catch (AccumuloNotInitializedException e) {
+ log.warn("failed to lookup property in zookeeper: " + key, e);
+ }
+ }
+
+ if (value == null || !property.getType().isValidFormat(value)) {
+ if (value != null)
+ log.error("Using parent value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+ value = parent.get(property);
+ }
+ return value;
+ }
+
+ @Override
+ public String get(Property property) {
+ if (Property.isFixedZooPropertyKey(property)) {
+ if (fixedProps.containsKey(property.getKey())) {
+ return fixedProps.get(property.getKey());
+ } else {
+ synchronized (fixedProps) {
+ String val = _get(property);
+ fixedProps.put(property.getKey(), val);
+ return val;
+ }
+
+ }
+ } else {
+ return _get(property);
+ }
+ }
+
+ private String get(String key) {
+ String zPath = ZooUtil.getRoot(instanceId) + Constants.ZCONFIG + "/" + key;
+ byte[] v = propCache.get(zPath);
+ String value = null;
+ if (v != null)
+ value = new String(v);
+ return value;
+ }
+
+ @Override
+ public Iterator<Entry<String,String>> iterator() {
+ TreeMap<String,String> entries = new TreeMap<String,String>();
+
+ for (Entry<String,String> parentEntry : parent)
+ entries.put(parentEntry.getKey(), parentEntry.getValue());
+
+ List<String> children = propCache.getChildren(ZooUtil.getRoot(instanceId) + Constants.ZCONFIG);
+ if (children != null) {
+ for (String child : children) {
+ String value = get(child);
+ if (child != null && value != null)
+ entries.put(child, value);
+ }
+ }
+
+ /*
+ * //this code breaks the shells ability to show updates just made //the code is probably not needed as fixed props are only obtained through get
+ *
+ * for(Property prop : Property.getFixedProperties()) get(prop);
+ *
+ * for(Entry<String, String> fprop : fixedProps.entrySet()) entries.put(fprop.getKey(), fprop.getValue());
+ */
+
+ return entries.entrySet().iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
new file mode 100644
index 0000000..ce5e5e4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.constraints;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.constraints.Constraint;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class MetadataConstraints implements Constraint {
+
+ private ZooCache zooCache = null;
+ private String zooRoot = null;
+
+ private static final Logger log = Logger.getLogger(MetadataConstraints.class);
+
+ private static boolean[] validTableNameChars = new boolean[256];
+
+ {
+ for (int i = 0; i < 256; i++) {
+ validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= '9')) || i == '!';
+ }
+ }
+
+ private static final HashSet<ColumnFQ> validColumnQuals = new HashSet<ColumnFQ>(Arrays.asList(new ColumnFQ[] {
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN,
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN,
+ TabletsSection.ServerColumnFamily.LOCK_COLUMN, TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN}));
+
+ private static final HashSet<Text> validColumnFams = new HashSet<Text>(Arrays.asList(new Text[] {TabletsSection.BulkFileColumnFamily.NAME,
+ LogColumnFamily.NAME, ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME,
+ TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME,
+ ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME}));
+
+ private static boolean isValidColumn(ColumnUpdate cu) {
+
+ if (validColumnFams.contains(new Text(cu.getColumnFamily())))
+ return true;
+
+ if (validColumnQuals.contains(new ColumnFQ(cu)))
+ return true;
+
+ return false;
+ }
+
+ static private ArrayList<Short> addViolation(ArrayList<Short> lst, int violation) {
+ if (lst == null)
+ lst = new ArrayList<Short>();
+ lst.add((short) violation);
+ return lst;
+ }
+
+ static private ArrayList<Short> addIfNotPresent(ArrayList<Short> lst, int intViolation) {
+ if (lst == null)
+ return addViolation(lst, intViolation);
+ short violation = (short) intViolation;
+ if (!lst.contains(violation))
+ return addViolation(lst, intViolation);
+ return lst;
+ }
+
+ @Override
+ public List<Short> check(Environment env, Mutation mutation) {
+
+ ArrayList<Short> violations = null;
+
+ Collection<ColumnUpdate> colUpdates = mutation.getUpdates();
+
+ // check the row, it should contains at least one ; or end with <
+ boolean containsSemiC = false;
+
+ byte[] row = mutation.getRow();
+
+ // always allow rows that fall within reserved areas
+ if (row.length > 0 && row[0] == '~')
+ return null;
+ if (row.length > 2 && row[0] == '!' && row[1] == '!' && row[2] == '~')
+ return null;
+
+ for (byte b : row) {
+ if (b == ';') {
+ containsSemiC = true;
+ }
+
+ if (b == ';' || b == '<')
+ break;
+
+ if (!validTableNameChars[0xff & b]) {
+ violations = addIfNotPresent(violations, 4);
+ }
+ }
+
+ if (!containsSemiC) {
+ // see if last row char is <
+ if (row.length == 0 || row[row.length - 1] != '<') {
+ violations = addIfNotPresent(violations, 4);
+ }
+ } else {
+ if (row.length == 0) {
+ violations = addIfNotPresent(violations, 4);
+ }
+ }
+
+ if (row.length > 0 && row[0] == '!') {
+ if (row.length < 3 || row[1] != '0' || (row[2] != '<' && row[2] != ';')) {
+ violations = addIfNotPresent(violations, 4);
+ }
+ }
+
+ // ensure row is not less than Constants.METADATA_TABLE_ID
+ if (new Text(row).compareTo(new Text(MetadataTable.ID)) < 0) {
+ violations = addViolation(violations, 5);
+ }
+
+ boolean checkedBulk = false;
+
+ for (ColumnUpdate columnUpdate : colUpdates) {
+ Text columnFamily = new Text(columnUpdate.getColumnFamily());
+
+ if (columnUpdate.isDeleted()) {
+ if (!isValidColumn(columnUpdate)) {
+ violations = addViolation(violations, 2);
+ }
+ continue;
+ }
+
+ if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME)) {
+ violations = addViolation(violations, 6);
+ }
+
+ if (columnFamily.equals(DataFileColumnFamily.NAME)) {
+ try {
+ DataFileValue dfv = new DataFileValue(columnUpdate.getValue());
+
+ if (dfv.getSize() < 0 || dfv.getNumEntries() < 0) {
+ violations = addViolation(violations, 1);
+ }
+ } catch (NumberFormatException nfe) {
+ violations = addViolation(violations, 1);
+ } catch (ArrayIndexOutOfBoundsException aiooe) {
+ violations = addViolation(violations, 1);
+ }
+ } else if (columnFamily.equals(ScanFileColumnFamily.NAME)) {
+
+ } else if (columnFamily.equals(TabletsSection.BulkFileColumnFamily.NAME)) {
+ if (!columnUpdate.isDeleted() && !checkedBulk) {
+ // splits, which also write the time reference, are allowed to write this reference even when
+ // the transaction is not running because the other half of the tablet is holding a reference
+ // to the file.
+ boolean isSplitMutation = false;
+ // When a tablet is assigned, it re-writes the metadata. It should probably only update the location information,
+ // but it writes everything. We allow it to re-write the bulk information if it is setting the location.
+ // See ACCUMULO-1230.
+ boolean isLocationMutation = false;
+
+ HashSet<Text> dataFiles = new HashSet<Text>();
+ HashSet<Text> loadedFiles = new HashSet<Text>();
+
+ String tidString = new String(columnUpdate.getValue());
+ int otherTidCount = 0;
+
+ for (ColumnUpdate update : mutation.getUpdates()) {
+ if (new ColumnFQ(update).equals(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN)) {
+ isSplitMutation = true;
+ } else if (new Text(update.getColumnFamily()).equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
+ isLocationMutation = true;
+ } else if (new Text(update.getColumnFamily()).equals(DataFileColumnFamily.NAME)) {
+ dataFiles.add(new Text(update.getColumnQualifier()));
+ } else if (new Text(update.getColumnFamily()).equals(TabletsSection.BulkFileColumnFamily.NAME)) {
+ loadedFiles.add(new Text(update.getColumnQualifier()));
+
+ if (!new String(update.getValue()).equals(tidString)) {
+ otherTidCount++;
+ }
+ }
+ }
+
+ if (!isSplitMutation && !isLocationMutation) {
+ long tid = Long.parseLong(tidString);
+
+ try {
+ if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
+ violations = addViolation(violations, 8);
+ }
+ } catch (Exception ex) {
+ violations = addViolation(violations, 8);
+ }
+ }
+
+ checkedBulk = true;
+ }
+ } else {
+ if (!isValidColumn(columnUpdate)) {
+ violations = addViolation(violations, 2);
+ } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN) && columnUpdate.getValue().length > 0
+ && (violations == null || !violations.contains((short) 4))) {
+ KeyExtent ke = new KeyExtent(new Text(mutation.getRow()), (Text) null);
+
+ Text per = KeyExtent.decodePrevEndRow(new Value(columnUpdate.getValue()));
+
+ boolean prevEndRowLessThanEndRow = per == null || ke.getEndRow() == null || per.compareTo(ke.getEndRow()) < 0;
+
+ if (!prevEndRowLessThanEndRow) {
+ violations = addViolation(violations, 3);
+ }
+ } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.ServerColumnFamily.LOCK_COLUMN)) {
+ if (zooCache == null) {
+ zooCache = new ZooCache();
+ }
+
+ if (zooRoot == null) {
+ zooRoot = ZooUtil.getRoot(HdfsZooInstance.getInstance());
+ }
+
+ boolean lockHeld = false;
+ String lockId = new String(columnUpdate.getValue());
+
+ try {
+ lockHeld = ZooLock.isLockHeld(zooCache, new ZooUtil.LockID(zooRoot, lockId));
+ } catch (Exception e) {
+ log.debug("Failed to verify lock was held " + lockId + " " + e.getMessage());
+ }
+
+ if (!lockHeld) {
+ violations = addViolation(violations, 7);
+ }
+ }
+
+ }
+ }
+
+ if (violations != null) {
+ log.debug("violating metadata mutation : " + new String(mutation.getRow()));
+ for (ColumnUpdate update : mutation.getUpdates()) {
+ log.debug(" update: " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " value "
+ + (update.isDeleted() ? "[delete]" : new String(update.getValue())));
+ }
+ }
+
+ return violations;
+ }
+
+ protected Arbitrator getArbitrator() {
+ return new ZooArbitrator();
+ }
+
+ @Override
+ public String getViolationDescription(short violationCode) {
+ switch (violationCode) {
+ case 1:
+ return "data file size must be a non-negative integer";
+ case 2:
+ return "Invalid column name given.";
+ case 3:
+ return "Prev end row is greater than or equal to end row.";
+ case 4:
+ return "Invalid metadata row format";
+ case 5:
+ return "Row can not be less than " + MetadataTable.ID;
+ case 6:
+ return "Empty values are not allowed for any " + MetadataTable.NAME + " column";
+ case 7:
+ return "Lock not held in zookeeper by writer";
+ case 8:
+ return "Bulk load transaction no longer running";
+ }
+ return null;
+ }
+
+ @Override
+ protected void finalize() {
+ if (zooCache != null)
+ zooCache.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java b/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
new file mode 100644
index 0000000..af992a6
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+
+public class ServerColumnUpdate extends ColumnUpdate {
+
+ ServerMutation parent;
+
+ public ServerColumnUpdate(byte[] cf, byte[] cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val, ServerMutation serverMutation) {
+ super(cf, cq, cv, hasts, ts, deleted, val);
+ parent = serverMutation;
+ }
+
+ public long getTimestamp() {
+ if (hasTimestamp())
+ return super.getTimestamp();
+ return parent.getSystemTimestamp();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
new file mode 100644
index 0000000..389cc33
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Mutation that holds system time as computed by the tablet server when not provided by the user.
+ */
+public class ServerMutation extends Mutation {
+ private long systemTime = 0l;
+
+ public ServerMutation(TMutation tmutation) {
+ super(tmutation);
+ }
+
+ public ServerMutation(Text key) {
+ super(key);
+ }
+
+ public ServerMutation() {
+ }
+
+ protected void droppingOldTimestamp(long ts) {
+ this.systemTime = ts;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ // new format writes system time with the mutation
+ if (getSerializedFormat() == SERIALIZED_FORMAT.VERSION2)
+ systemTime = WritableUtils.readVLong(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ WritableUtils.writeVLong(out, systemTime);
+ }
+
+ public void setSystemTimestamp(long v) {
+ this.systemTime = v;
+ }
+
+ public long getSystemTimestamp() {
+ return this.systemTime;
+ }
+
+ @Override
+ protected ColumnUpdate newColumnUpdate(byte[] cf, byte[] cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val) {
+ return new ServerColumnUpdate(cf, cq, cv, hasts, ts, deleted, val, this);
+ }
+
+ @Override
+ public long estimatedMemoryUsed() {
+ return super.estimatedMemoryUsed() + 8;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null || o.getClass() != ServerMutation.class) {
+ return false;
+ }
+ ServerMutation sm = (ServerMutation) o;
+ if (sm.systemTime != systemTime) {
+ return false;
+ }
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (int) (systemTime & 0xffffffff);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
new file mode 100644
index 0000000..b4bea4a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * This is a glue object, to convert short file references to long references.
+ * The !METADATA table may contain old relative file references. This class keeps
+ * track of the short file reference, so it can be removed properly from the !METADATA table.
+ */
+public class FileRef implements Comparable<FileRef> {
+ String metaReference; // something like ../2/d-00000/A00001.rf
+ Path fullReference; // something like hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
+
+ public FileRef(VolumeManager fs, Key key) {
+ metaReference = key.getColumnQualifier().toString();
+ fullReference = fs.getFullPath(key);
+ }
+
+ public FileRef(String metaReference, Path fullReference) {
+ this.metaReference = metaReference;
+ this.fullReference = fullReference;
+ }
+
+ public FileRef(String path) {
+ this.metaReference = path;
+ this.fullReference = new Path(path);
+ }
+
+ public String toString() {
+ return fullReference.toString();
+ }
+
+ public Path path() {
+ return fullReference;
+ }
+
+ public Text meta() {
+ return new Text(metaReference);
+ }
+
+ @Override
+ public int compareTo(FileRef o) {
+ return path().compareTo(o.path());
+ }
+
+ @Override
+ public int hashCode() {
+ return path().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof FileRef) {
+ return compareTo((FileRef)obj) == 0;
+ }
+ return false;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
new file mode 100644
index 0000000..2760b07
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.util.Random;
+
+public class RandomVolumeChooser implements VolumeChooser {
+ Random random = new Random();
+
+ @Override
+ public String choose(String[] options) {
+ return options[random.nextInt(options.length)];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
new file mode 100644
index 0000000..8713c97
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+
+public interface VolumeChooser {
+ String choose(String[] options);
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
new file mode 100644
index 0000000..b7787c9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A wrapper around multiple hadoop FileSystem objects, which are assumed to be different volumes.
+ * This also concentrates a bunch of meta-operations like waiting for SAFE_MODE, and closing WALs.
+ */
+public interface VolumeManager {
+
+
+
+ public static enum FileType {
+ TABLE(ServerConstants.TABLE_DIR), WAL(ServerConstants.WAL_DIR), RECOVERY(ServerConstants.RECOVERY_DIR);
+
+ private String dir;
+
+ FileType(String dir) {
+ this.dir = dir;
+ }
+
+ public String getDirectory() {
+ return dir;
+ }
+ }
+
+ // close the underlying FileSystems
+ void close() throws IOException;
+
+ // the mechanism by which the master ensures that tablet servers can no longer write to a WAL
+ boolean closePossiblyOpenFile(Path path) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ FSDataOutputStream create(Path dest) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ FSDataOutputStream create(Path path, boolean b) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ FSDataOutputStream create(Path path, boolean b, int int1, short int2, long long1) throws IOException;
+
+ // create a file, but only if it doesn't exist
+ boolean createNewFile(Path writable) throws IOException;
+
+ // create a file which can be sync'd to disk
+ FSDataOutputStream createSyncable(Path logPath, int buffersize, short replication, long blockSize) throws IOException;
+
+ // delete a file
+ boolean delete(Path path) throws IOException;
+
+ // delete a directory and anything under it
+ boolean deleteRecursively(Path path) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ boolean exists(Path path) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ FileStatus getFileStatus(Path path) throws IOException;
+
+ // find the appropriate FileSystem object given a path
+ FileSystem getFileSystemByPath(Path path);
+
+ // get a mapping of volume to FileSystem
+ Map<String, ? extends FileSystem> getFileSystems();
+
+ // return the item in options that is in the same volume as source
+ Path matchingFileSystem(Path source, String[] options);
+
+
+ // forward to the appropriate FileSystem object
+ FileStatus[] listStatus(Path path) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ boolean mkdirs(Path directory) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ FSDataInputStream open(Path path) throws IOException;
+
+ // forward to the appropriate FileSystem object, throws an exception if the paths are in different volumes
+ boolean rename(Path path, Path newPath) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ boolean moveToTrash(Path sourcePath) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ short getDefaultReplication(Path logPath);
+
+ // forward to the appropriate FileSystem object
+ boolean isFile(Path path) throws IOException;
+
+ // all volume are ready to provide service (not in SafeMode, for example)
+ boolean isReady() throws IOException;
+
+ // ambiguous references to files go here
+ FileSystem getDefaultVolume();
+
+ // forward to the appropriate FileSystem object
+ FileStatus[] globStatus(Path path) throws IOException;
+
+ // Convert a file or directory !METADATA reference into a path
+ Path getFullPath(Key key);
+
+ Path getFullPath(String tableId, String path);
+
+ // Given a filename, figure out the qualified path given multiple namespaces
+ Path getFullPath(FileType fileType, String fileName) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ ContentSummary getContentSummary(Path dir) throws IOException;
+
+ // decide on which of the given locations to create a new file
+ String choose(String[] options);
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
new file mode 100644
index 0000000..39afe75
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+public class VolumeManagerImpl implements VolumeManager {
+
+ private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
+
+ Map<String,? extends FileSystem> volumes;
+ String defaultVolume;
+ AccumuloConfiguration conf;
+ VolumeChooser chooser;
+
+ protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) {
+ this.volumes = volumes;
+ this.defaultVolume = defaultVolume;
+ this.conf = conf;
+ ensureSyncIsEnabled();
+ chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser());
+ }
+
+ public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException {
+ return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "",
+ DefaultConfiguration.getDefaultConfiguration());
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException ex = null;
+ for (FileSystem fs : volumes.values()) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ ex = e;
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean closePossiblyOpenFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ try {
+ return dfs.recoverLease(path);
+ } catch (FileNotFoundException ex) {
+ throw ex;
+ }
+ } else if (fs instanceof LocalFileSystem) {
+ // ignore
+ } else {
+ throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+ }
+ fs.append(path).close();
+ log.info("Recovered lease on " + path.toString() + " using append");
+ return true;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path, overwrite);
+ }
+
+ private static long correctBlockSize(Configuration conf, long blockSize) {
+ if (blockSize <= 0)
+ blockSize = conf.getLong("dfs.block.size", 67108864);
+
+ int checkSum = conf.getInt("io.bytes.per.checksum", 512);
+ blockSize -= blockSize % checkSum;
+ blockSize = Math.max(blockSize, checkSum);
+ return blockSize;
+ }
+
+ private static int correctBufferSize(Configuration conf, int bufferSize) {
+ if (bufferSize <= 0)
+ bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ return bufferSize;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (bufferSize == 0) {
+ fs.getConf().getInt("io.file.buffer.size", 4096);
+ }
+ return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
+ }
+
+ @Override
+ public boolean createNewFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.createNewFile(path);
+ }
+
+ @Override
+ public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(logPath);
+ blockSize = correctBlockSize(fs.getConf(), blockSize);
+ bufferSize = correctBufferSize(fs.getConf(), bufferSize);
+ try {
+ // This...
+ // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+ // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+ // Becomes this:
+ Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+ List<Enum<?>> flags = new ArrayList<Enum<?>>();
+ if (createFlags.isEnum()) {
+ for (Object constant : createFlags.getEnumConstants()) {
+ if (constant.toString().equals("SYNC_BLOCK")) {
+ flags.add((Enum<?>) constant);
+ log.debug("Found synch enum " + constant);
+ }
+ if (constant.toString().equals("CREATE")) {
+ flags.add((Enum<?>) constant);
+ log.debug("Found CREATE enum " + constant);
+ }
+ }
+ }
+ Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+ log.debug("CreateFlag set: " + set);
+ Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+ log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+ return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
+ } catch (ClassNotFoundException ex) {
+ // Expected in hadoop 1.0
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ } catch (Exception ex) {
+ log.debug(ex, ex);
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ }
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+ return getFileSystemByPath(path).delete(path, false);
+ }
+
+ @Override
+ public boolean deleteRecursively(Path path) throws IOException {
+ return getFileSystemByPath(path).delete(path, true);
+ }
+
+ protected void ensureSyncIsEnabled() {
+ for (Entry<String,? extends FileSystem> entry : getFileSystems().entrySet()) {
+ final String volumeName = entry.getKey();
+ final FileSystem fs = entry.getValue();
+
+ if (fs instanceof DistributedFileSystem) {
+ final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append";
+ final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details.";
+ // Check to make sure that we have proper defaults configured
+ try {
+ // If the default is off (0.20.205.x or 1.0.x)
+ DFSConfigKeys configKeys = new DFSConfigKeys();
+
+ // Can't use the final constant itself as Java will inline it at compile time
+ Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT");
+ boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys);
+
+ if (!dfsSupportAppendDefaultValue) {
+ // See if the user did the correct override
+ if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) {
+ String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage;
+ log.fatal(msg);
+ throw new RuntimeException(msg);
+ }
+ }
+ } catch (NoSuchFieldException e) {
+ // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running
+ // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync enabled.
+ } catch (Exception e) {
+ log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName + ". The user should ensure that Hadoop is configured to properly supports append and sync. " + ticketMessage, e);
+ }
+
+ // If either of these parameters are configured to be false, fail.
+ // This is a sign that someone is writing bad configuration.
+ if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC, true)) {
+ String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC + " not be configured as false. " + ticketMessage;
+ log.fatal(msg);
+ throw new RuntimeException(msg);
+ }
+
+ try {
+ // if this class exists
+ Class.forName("org.apache.hadoop.fs.CreateFlag");
+ // we're running hadoop 2.0, 1.1
+ if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+ log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss on volume " + volumeName);
+ }
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return getFileSystemByPath(path).exists(path);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).getFileStatus(path);
+ }
+
+ @Override
+ public FileSystem getFileSystemByPath(Path path) {
+ if (path.toString().contains(":")) {
+ try {
+ return path.getFileSystem(CachedConfiguration.getInstance());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return volumes.get(defaultVolume);
+ }
+
+ @Override
+ public Map<String,? extends FileSystem> getFileSystems() {
+ return volumes;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).listStatus(path);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ return getFileSystemByPath(path).mkdirs(path);
+ }
+
+ @Override
+ public FSDataInputStream open(Path path) throws IOException {
+ return getFileSystemByPath(path).open(path);
+ }
+
+ @Override
+ public boolean rename(Path path, Path newPath) throws IOException {
+ FileSystem source = getFileSystemByPath(path);
+ FileSystem dest = getFileSystemByPath(newPath);
+ if (source != dest) {
+ throw new NotImplementedException("Cannot rename files across volumes: " + path + " -> " + newPath);
+ }
+ return source.rename(path, newPath);
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ Trash trash = new Trash(fs, fs.getConf());
+ return trash.moveToTrash(path);
+ }
+
+ @Override
+ public short getDefaultReplication(Path path) {
+ @SuppressWarnings("deprecation")
+ short rep = getFileSystemByPath(path).getDefaultReplication();
+ return rep;
+ }
+
+ @Override
+ public boolean isFile(Path path) throws IOException {
+ return getFileSystemByPath(path).isFile(path);
+ }
+
+ public static VolumeManager get() throws IOException {
+ AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+ return get(conf);
+ }
+
+ static private final String DEFAULT = "";
+
+ public static VolumeManager get(AccumuloConfiguration conf) throws IOException {
+ Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>();
+ Configuration hadoopConf = CachedConfiguration.getInstance();
+ fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf));
+ String ns = conf.get(Property.INSTANCE_VOLUMES);
+ if (ns != null && !ns.isEmpty()) {
+ for (String space : ns.split(",")) {
+ if (space.equals(DEFAULT))
+ throw new IllegalArgumentException();
+
+ if (space.contains(":")) {
+ fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
+ } else {
+ fileSystems.put(space, FileSystem.get(hadoopConf));
+ }
+ }
+ }
+ return new VolumeManagerImpl(fileSystems, DEFAULT, conf);
+ }
+
+ @Override
+ public boolean isReady() throws IOException {
+ for (FileSystem fs : getFileSystems().values()) {
+ if (!(fs instanceof DistributedFileSystem))
+ continue;
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+ // Becomes this:
+ Class<?> safeModeAction;
+ try {
+ // hadoop 2.0
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ try {
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot figure out the right class for Constants");
+ }
+ }
+ Object get = null;
+ for (Object obj : safeModeAction.getEnumConstants()) {
+ if (obj.toString().equals("SAFEMODE_GET"))
+ get = obj;
+ }
+ if (get == null) {
+ throw new RuntimeException("cannot find SAFEMODE_GET");
+ }
+ try {
+ Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+ boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
+ if (inSafeMode) {
+ return false;
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException("cannot find method setSafeMode");
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public FileSystem getDefaultVolume() {
+ return volumes.get(defaultVolume);
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ return getFileSystemByPath(pathPattern).globStatus(pathPattern);
+ }
+
+ @Override
+ public Path getFullPath(Key key) {
+ // TODO sanity check col fam
+ String relPath = key.getColumnQualifierData().toString();
+ byte[] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
+ return getFullPath(new String(tableId), relPath);
+ }
+
+ @Override
+ public Path matchingFileSystem(Path source, String[] options) {
+ URI uri1 = source.toUri();
+ for (String option : options) {
+ URI uri3 = URI.create(option);
+ if (uri1.getScheme().equals(uri3.getScheme())) {
+ String a1 = uri1.getAuthority();
+ String a2 = uri3.getAuthority();
+ if (a1 == a2 || (a1 != null && a1.equals(a2)))
+ return new Path(option);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Path getFullPath(String tableId, String path) {
+ if (path.contains(":"))
+ return new Path(path);
+
+ if (path.startsWith("../"))
+ path = path.substring(2);
+ else if (path.startsWith("/"))
+ path = "/" + tableId + path;
+ else
+ throw new IllegalArgumentException("Unexpected path prefix " + path);
+
+ return getFullPath(FileType.TABLE, path);
+ }
+
+ @Override
+ public Path getFullPath(FileType fileType, String path) {
+ if (path.contains(":"))
+ return new Path(path);
+
+ // normalize the path
+ Path fullPath = new Path(ServerConstants.getDefaultBaseDir(), fileType.getDirectory());
+ if (path.startsWith("/"))
+ path = path.substring(1);
+ fullPath = new Path(fullPath, path);
+
+ FileSystem fs = getFileSystemByPath(fullPath);
+ return fs.makeQualified(fullPath);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path dir) throws IOException {
+ return getFileSystemByPath(dir).getContentSummary(dir);
+ }
+
+ @Override
+ public String choose(String[] options) {
+ return chooser.choose(options);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
new file mode 100644
index 0000000..0477a44
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.init;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import jline.console.ConsoleReader;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.util.TablePropUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This class is used to setup the directory structure and the root tablet to get an instance started
+ *
+ */
+public class Initialize {
+ private static final Logger log = Logger.getLogger(Initialize.class);
+ private static final String DEFAULT_ROOT_USER = "root";
+ public static final String TABLE_TABLETS_TABLET_DIR = "/table_info";
+
+ private static ConsoleReader reader = null;
+
+ private static ConsoleReader getConsoleReader() throws IOException {
+ if (reader == null)
+ reader = new ConsoleReader();
+ return reader;
+ }
+
+ private static HashMap<String,String> initialMetadataConf = new HashMap<String,String>();
+ static {
+ initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K");
+ initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
+ initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true");
+ initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
+ initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
+ initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName());
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", "10," + VersioningIterator.class.getName());
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", "1");
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", "10," + VersioningIterator.class.getName());
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1");
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName());
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1");
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
+ initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
+ initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet",
+ String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME));
+ initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", String.format("%s,%s,%s,%s", DataFileColumnFamily.NAME,
+ LogColumnFamily.NAME, TabletsSection.ServerColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME));
+ initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server");
+ initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), "");
+ initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true");
+ initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
+ }
+
+ public static boolean doInit(Opts opts, Configuration conf, VolumeManager fs) throws IOException {
+ if (!ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI).equals(""))
+ log.info("Hadoop Filesystem is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI));
+ else
+ log.info("Hadoop Filesystem is " + FileSystem.getDefaultUri(conf));
+
+ log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs()));
+ log.info("Zookeeper server is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST));
+ log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running");
+ if (!zookeeperAvailable()) {
+ log.fatal("Zookeeper needs to be up and running in order to init. Exiting ...");
+ return false;
+ }
+ if (ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_SECRET).equals(Property.INSTANCE_SECRET.getDefaultValue())) {
+ ConsoleReader c = getConsoleReader();
+ c.beep();
+ c.println();
+ c.println();
+ c.println("Warning!!! Your instance secret is still set to the default, this is not secure. We highly recommend you change it.");
+ c.println();
+ c.println();
+ c.println("You can change the instance secret in accumulo by using:");
+ c.println(" bin/accumulo " + org.apache.accumulo.server.util.ChangeSecret.class.getName() + " oldPassword newPassword.");
+ c.println("You will also need to edit your secret in your configuration file by adding the property instance.secret to your conf/accumulo-site.xml. Without this accumulo will not operate correctly");
+ }
+
+ try {
+ if (isInitialized(fs)) {
+ log.fatal("It appears this location was previously initialized, exiting ... ");
+ return false;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // prompt user for instance name and root password early, in case they
+ // abort, we don't leave an inconsistent HDFS/ZooKeeper structure
+ String instanceNamePath;
+ try {
+ instanceNamePath = getInstanceNamePath(opts);
+ } catch (Exception e) {
+ log.fatal("Failed to talk to zookeeper", e);
+ return false;
+ }
+ opts.rootpass = getRootPassword(opts);
+ return initialize(opts, instanceNamePath, fs);
+ }
+
+ public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) {
+
+ UUID uuid = UUID.randomUUID();
+ // the actual disk locations of the root table and tablets
+ final Path rootTablet = new Path(fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
+ try {
+ initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTablet);
+ } catch (Exception e) {
+ log.fatal("Failed to initialize zookeeper", e);
+ return false;
+ }
+
+ try {
+ initFileSystem(opts, fs, uuid, rootTablet);
+ } catch (Exception e) {
+ log.fatal("Failed to initialize filesystem", e);
+ return false;
+ }
+
+ try {
+ initSecurity(opts, uuid.toString());
+ } catch (Exception e) {
+ log.fatal("Failed to initialize security", e);
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean zookeeperAvailable() {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ try {
+ return zoo.exists("/");
+ } catch (KeeperException e) {
+ return false;
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ private static Path[] paths(String[] paths) {
+ Path[] result = new Path[paths.length];
+ for (int i = 0; i < paths.length; i++) {
+ result[i] = new Path(paths[i]);
+ }
+ return result;
+ }
+
+ //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
+ @SuppressWarnings("deprecation")
+ private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, Path rootTablet) throws IOException {
+ FileStatus fstat;
+
+ // the actual disk locations of the metadata table and tablets
+ final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs());
+
+ String tableMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR));
+ String defaultMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION));
+
+ fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION));
+
+ // create an instance id
+ fs.mkdirs(ServerConstants.getInstanceIdLocation());
+ fs.createNewFile(new Path(ServerConstants.getInstanceIdLocation(), uuid.toString()));
+
+ // initialize initial metadata config in zookeeper
+ initMetadataConfig();
+
+ // create metadata table
+ for (Path mtd : metadataTableDirs) {
+ try {
+ fstat = fs.getFileStatus(mtd);
+ if (!fstat.isDir()) {
+ log.fatal("location " + mtd.toString() + " exists but is not a directory");
+ return;
+ }
+ } catch (FileNotFoundException fnfe) {
+ if (!fs.mkdirs(mtd)) {
+ log.fatal("unable to create directory " + mtd.toString());
+ return;
+ }
+ }
+ }
+
+ // create root table and tablet
+ try {
+ fstat = fs.getFileStatus(rootTablet);
+ if (!fstat.isDir()) {
+ log.fatal("location " + rootTablet.toString() + " exists but is not a directory");
+ return;
+ }
+ } catch (FileNotFoundException fnfe) {
+ if (!fs.mkdirs(rootTablet)) {
+ log.fatal("unable to create directory " + rootTablet.toString());
+ return;
+ }
+ }
+
+ // populate the root tablet with info about the default tablet
+ // the root tablet contains the key extent and locations of all the
+ // metadata tablets
+ String initRootTabFile = rootTablet + "/00000_00000." + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
+ FileSystem ns = fs.getFileSystemByPath(new Path(initRootTabFile));
+ FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, ns, ns.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+ mfw.startDefaultLocalityGroup();
+
+ Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
+
+ // table tablet's directory
+ Key tableDirKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+ mfw.append(tableDirKey, new Value(tableMetadataTabletDir.getBytes()));
+
+ // table tablet time
+ Key tableTimeKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
+ mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+
+ // table tablet's prevrow
+ Key tablePrevRowKey = new Key(tableExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+ mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(null));
+
+ // ----------] default tablet info
+ Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null));
+
+ // default's directory
+ Key defaultDirKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+ mfw.append(defaultDirKey, new Value(defaultMetadataTabletDir.getBytes()));
+
+ // default's time
+ Key defaultTimeKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
+ mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+
+ // default's prevrow
+ Key defaultPrevRowKey = new Key(defaultExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+ mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
+
+ mfw.close();
+
+ // create table and default tablets directories
+ for (String s : Arrays.asList(tableMetadataTabletDir, defaultMetadataTabletDir)) {
+ Path dir = new Path(s);
+ try {
+ fstat = fs.getFileStatus(dir);
+ if (!fstat.isDir()) {
+ log.fatal("location " + dir.toString() + " exists but is not a directory");
+ return;
+ }
+ } catch (FileNotFoundException fnfe) {
+ try {
+ fstat = fs.getFileStatus(dir);
+ if (!fstat.isDir()) {
+ log.fatal("location " + dir.toString() + " exists but is not a directory");
+ return;
+ }
+ } catch (FileNotFoundException fnfe2) {
+ // create table info dir
+ if (!fs.mkdirs(dir)) {
+ log.fatal("unable to create directory " + dir.toString());
+ return;
+ }
+ }
+
+ // create default dir
+ if (!fs.mkdirs(dir)) {
+ log.fatal("unable to create directory " + dir.toString());
+ return;
+ }
+ }
+ }
+ }
+
+ private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath, Path rootTablet) throws KeeperException, InterruptedException {
+ // setup basic data in zookeeper
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+ ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT + Constants.ZINSTANCES, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+
+ // setup instance name
+ if (opts.clearInstanceName)
+ zoo.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP);
+ zoo.putPersistentData(instanceNamePath, uuid.getBytes(), NodeExistsPolicy.FAIL);
+
+ // setup the instance
+ String zkInstanceRoot = Constants.ZROOT + "/" + uuid;
+ zoo.putPersistentData(zkInstanceRoot, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID, NodeExistsPolicy.FAIL);
+ TableManager.prepareNewTableState(uuid, RootTable.ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
+ TableManager.prepareNewTableState(uuid, MetadataTable.ID, MetadataTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTablet.toString().getBytes(), NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(), NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, new byte[] {'0'}, NodeExistsPolicy.FAIL);
+ }
+
+ private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {
+ // setup the instance name
+ String instanceName, instanceNamePath = null;
+ boolean exists = true;
+ do {
+ if (opts.cliInstanceName == null) {
+ instanceName = getConsoleReader().readLine("Instance name : ");
+ } else {
+ instanceName = opts.cliInstanceName;
+ }
+ if (instanceName == null)
+ System.exit(0);
+ instanceName = instanceName.trim();
+ if (instanceName.length() == 0)
+ continue;
+ instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
+ if (opts.clearInstanceName) {
+ exists = false;
+ break;
+ } else if (exists = ZooReaderWriter.getInstance().exists(instanceNamePath)) {
+ String decision = getConsoleReader().readLine("Instance name \"" + instanceName + "\" exists. Delete existing entry from zookeeper? [Y/N] : ");
+ if (decision == null)
+ System.exit(0);
+ if (decision.length() == 1 && decision.toLowerCase(Locale.ENGLISH).charAt(0) == 'y') {
+ opts.clearInstanceName = true;
+ exists = false;
+ }
+ }
+ } while (exists);
+ return instanceNamePath;
+ }
+
+ private static byte[] getRootPassword(Opts opts) throws IOException {
+ if (opts.cliPassword != null) {
+ return opts.cliPassword.getBytes();
+ }
+ String rootpass;
+ String confirmpass;
+ do {
+ rootpass = getConsoleReader()
+ .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*');
+ if (rootpass == null)
+ System.exit(0);
+ confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*');
+ if (confirmpass == null)
+ System.exit(0);
+ if (!rootpass.equals(confirmpass))
+ log.error("Passwords do not match");
+ } while (!rootpass.equals(confirmpass));
+ return rootpass.getBytes();
+ }
+
+ private static void initSecurity(Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException {
+ AuditedSecurityOperation.getInstance(iid, true).initializeSecurity(SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()), DEFAULT_ROOT_USER,
+ opts.rootpass);
+ }
+
+ public static void initMetadataConfig(String tableId) throws IOException {
+ try {
+ Configuration conf = CachedConfiguration.getInstance();
+ int max = conf.getInt("dfs.replication.max", 512);
+ // Hadoop 0.23 switched the min value configuration name
+ int min = Math.max(conf.getInt("dfs.replication.min", 1), conf.getInt("dfs.namenode.replication.min", 1));
+ if (max < 5)
+ setMetadataReplication(max, "max");
+ if (min > 5)
+ setMetadataReplication(min, "min");
+ for (Entry<String,String> entry : initialMetadataConf.entrySet()) {
+ if (!TablePropUtil.setTableProperty(RootTable.ID, entry.getKey(), entry.getValue()))
+ throw new IOException("Cannot create per-table property " + entry.getKey());
+ if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue()))
+ throw new IOException("Cannot create per-table property " + entry.getKey());
+ }
+ } catch (Exception e) {
+ log.fatal("error talking to zookeeper", e);
+ throw new IOException(e);
+ }
+ }
+
+ protected static void initMetadataConfig() throws IOException {
+ initMetadataConfig(RootTable.ID);
+ initMetadataConfig(MetadataTable.ID);
+ }
+
+ private static void setMetadataReplication(int replication, String reason) throws IOException {
+ String rep = getConsoleReader().readLine(
+ "Your HDFS replication " + reason + " is not compatible with our default " + MetadataTable.NAME + " replication of 5. What do you want to set your "
+ + MetadataTable.NAME + " replication to? (" + replication + ") ");
+ if (rep == null || rep.length() == 0)
+ rep = Integer.toString(replication);
+ else
+ // Lets make sure it's a number
+ Integer.parseInt(rep);
+ initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep);
+ }
+
+ public static boolean isInitialized(VolumeManager fs) throws IOException {
+ return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation()));
+ }
+
+ static class Opts extends Help {
+ @Parameter(names = "--reset-security", description = "just update the security information")
+ boolean resetSecurity = false;
+ @Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting")
+ boolean clearInstanceName = false;
+ @Parameter(names = "--instance-name", description = "the instance name, if not provided, will prompt")
+ String cliInstanceName;
+ @Parameter(names = "--password", description = "set the password on the command line")
+ String cliPassword;
+
+ byte[] rootpass = null;
+ }
+
+ public static void main(String[] args) {
+ Opts opts = new Opts();
+ opts.parseArgs(Initialize.class.getName(), args);
+
+ try {
+ SecurityUtil.serverLogin();
+ Configuration conf = CachedConfiguration.getInstance();
+
+ @SuppressWarnings("deprecation")
+ VolumeManager fs = VolumeManagerImpl.get(SiteConfiguration.getSiteConfiguration());
+
+ if (opts.resetSecurity) {
+ if (isInitialized(fs)) {
+ opts.rootpass = getRootPassword(opts);
+ initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID());
+ } else {
+ log.fatal("Attempted to reset security on accumulo before it was initialized");
+ }
+ } else if (!doInit(opts, conf, fs))
+ System.exit(-1);
+ } catch (Exception e) {
+ log.fatal(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
new file mode 100644
index 0000000..d8bcebe
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.log4j.Logger;
+
+/**
+ * A special iterator for the metadata table that removes inactive bulk load flags
+ *
+ */
+public class MetadataBulkLoadFilter extends Filter {
+ private static Logger log = Logger.getLogger(MetadataBulkLoadFilter.class);
+
+ enum Status {
+ ACTIVE, INACTIVE
+ }
+
+ Map<Long,Status> bulkTxStatusCache;
+ Arbitrator arbitrator;
+
+ @Override
+ public boolean accept(Key k, Value v) {
+ if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) {
+ long txid = Long.valueOf(v.toString());
+
+ Status status = bulkTxStatusCache.get(txid);
+ if (status == null) {
+ try {
+ if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
+ status = Status.INACTIVE;
+ } else {
+ status = Status.ACTIVE;
+ }
+ } catch (Exception e) {
+ status = Status.ACTIVE;
+ log.error(e, e);
+ }
+
+ bulkTxStatusCache.put(txid, status);
+ }
+
+ return status == Status.ACTIVE;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+
+ if (env.getIteratorScope() == IteratorScope.scan) {
+ throw new IOException("This iterator not intended for use at scan time");
+ }
+
+ bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
+ arbitrator = getArbitrator();
+ }
+
+ protected Arbitrator getArbitrator() {
+ return new ZooArbitrator();
+ }
+}