You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:21 UTC
[42/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
new file mode 100644
index 0000000..6979268
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -0,0 +1,1101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.BatchWriterImpl;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.Writer;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * provides a reference to the metadata table for updates by tablet servers
+ */
+public class MetadataTableUtil {
+
+ private static final Text EMPTY_TEXT = new Text();
+ private static Map<Credentials,Writer> root_tables = new HashMap<Credentials,Writer>();
+ private static Map<Credentials,Writer> metadata_tables = new HashMap<Credentials,Writer>();
+ private static final Logger log = Logger.getLogger(MetadataTableUtil.class);
+
+ private static final int SAVE_ROOT_TABLET_RETRIES = 3;
+
+ private MetadataTableUtil() {}
+
+ public synchronized static Writer getMetadataTable(Credentials credentials) {
+ Writer metadataTable = metadata_tables.get(credentials);
+ if (metadataTable == null) {
+ metadataTable = new Writer(HdfsZooInstance.getInstance(), credentials, MetadataTable.ID);
+ metadata_tables.put(credentials, metadataTable);
+ }
+ return metadataTable;
+ }
+
+ public synchronized static Writer getRootTable(Credentials credentials) {
+ Writer rootTable = root_tables.get(credentials);
+ if (rootTable == null) {
+ rootTable = new Writer(HdfsZooInstance.getInstance(), credentials, RootTable.ID);
+ root_tables.put(credentials, rootTable);
+ }
+ return rootTable;
+ }
+
+ public static void putLockID(ZooLock zooLock, Mutation m) {
+ TabletsSection.ServerColumnFamily.LOCK_COLUMN.put(m, new Value(zooLock.getLockID().serialize(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + "/")
+ .getBytes()));
+ }
+
+ public static void update(Credentials credentials, Mutation m, KeyExtent extent) {
+ update(credentials, null, m, extent);
+ }
+
+ public static void update(Credentials credentials, ZooLock zooLock, Mutation m, KeyExtent extent) {
+ Writer t = extent.isMeta() ? getRootTable(credentials) : getMetadataTable(credentials);
+ if (zooLock != null)
+ putLockID(zooLock, m);
+ while (true) {
+ try {
+ t.update(m);
+ return;
+ } catch (AccumuloException e) {
+ log.error(e, e);
+ } catch (AccumuloSecurityException e) {
+ log.error(e, e);
+ } catch (ConstraintViolationException e) {
+ log.error(e, e);
+ } catch (TableNotFoundException e) {
+ log.error(e, e);
+ }
+ UtilWaitThread.sleep(1000);
+ }
+
+ }
+
+ public static void updateTabletFlushID(KeyExtent extent, long flushID, Credentials credentials, ZooLock zooLock) {
+ if (!extent.isRootTablet()) {
+ Mutation m = new Mutation(extent.getMetadataEntry());
+ TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value((flushID + "").getBytes()));
+ update(credentials, zooLock, m, extent);
+ }
+ }
+
+ public static void updateTabletCompactID(KeyExtent extent, long compactID, Credentials credentials, ZooLock zooLock) {
+ if (!extent.isRootTablet()) {
+ Mutation m = new Mutation(extent.getMetadataEntry());
+ TabletsSection.ServerColumnFamily.COMPACT_COLUMN.put(m, new Value((compactID + "").getBytes()));
+ update(credentials, zooLock, m, extent);
+ }
+ }
+
+ public static void updateTabletDataFile(long tid, KeyExtent extent, Map<FileRef,DataFileValue> estSizes, String time, Credentials credentials, ZooLock zooLock) {
+ Mutation m = new Mutation(extent.getMetadataEntry());
+ byte[] tidBytes = Long.toString(tid).getBytes();
+
+ for (Entry<FileRef,DataFileValue> entry : estSizes.entrySet()) {
+ Text file = entry.getKey().meta();
+ m.put(DataFileColumnFamily.NAME, file, new Value(entry.getValue().encode()));
+ m.put(TabletsSection.BulkFileColumnFamily.NAME, file, new Value(tidBytes));
+ }
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(time.getBytes()));
+ update(credentials, zooLock, m, extent);
+ }
+
+ public static void addTablet(KeyExtent extent, String path, Credentials credentials, char timeType, ZooLock lock) {
+ Mutation m = extent.getPrevRowUpdateMutation();
+
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(path.getBytes()));
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value((timeType + "0").getBytes()));
+
+ update(credentials, lock, m, extent);
+ }
+
+ public static void updateTabletPrevEndRow(KeyExtent extent, Credentials credentials) {
+ Mutation m = extent.getPrevRowUpdateMutation(); //
+ update(credentials, m, extent);
+ }
+
+ /**
+ * convenience method for reading entries from the metadata table
+ */
+ public static SortedMap<KeyExtent,Text> getMetadataDirectoryEntries(SortedMap<Key,Value> entries) {
+ Key key;
+ Value val;
+ Text datafile = null;
+ Value prevRow = null;
+ KeyExtent ke;
+
+ SortedMap<KeyExtent,Text> results = new TreeMap<KeyExtent,Text>();
+
+ Text lastRowFromKey = new Text();
+
+ // text obj below is meant to be reused in loop for efficiency
+ Text colf = new Text();
+ Text colq = new Text();
+
+ for (Entry<Key,Value> entry : entries.entrySet()) {
+ key = entry.getKey();
+ val = entry.getValue();
+
+ if (key.compareRow(lastRowFromKey) != 0) {
+ prevRow = null;
+ datafile = null;
+ key.getRow(lastRowFromKey);
+ }
+
+ colf = key.getColumnFamily(colf);
+ colq = key.getColumnQualifier(colq);
+
+ // interpret the row id as a key extent
+ if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.equals(colf, colq))
+ datafile = new Text(val.toString());
+
+ else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(colf, colq))
+ prevRow = new Value(val);
+
+ if (datafile != null && prevRow != null) {
+ ke = new KeyExtent(key.getRow(), prevRow);
+ results.put(ke, datafile);
+
+ datafile = null;
+ prevRow = null;
+ }
+ }
+ return results;
+ }
+
+ public static boolean recordRootTabletLocation(String address) {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ for (int i = 0; i < SAVE_ROOT_TABLET_RETRIES; i++) {
+ try {
+ log.info("trying to write root tablet location to ZooKeeper as " + address);
+ String zRootLocPath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_LOCATION;
+ zoo.putPersistentData(zRootLocPath, address.getBytes(), NodeExistsPolicy.OVERWRITE);
+ return true;
+ } catch (Exception e) {
+ log.error("Master: unable to save root tablet location in zookeeper. exception: " + e, e);
+ }
+ }
+ log.error("Giving up after " + SAVE_ROOT_TABLET_RETRIES + " retries");
+ return false;
+ }
+
+ public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, Credentials credentials) throws IOException {
+ TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
+
+ Scanner mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, MetadataTable.ID, Authorizations.EMPTY);
+ mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ Text row = extent.getMetadataEntry();
+ VolumeManager fs = VolumeManagerImpl.get();
+
+ Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text(""));
+ endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
+
+ mdScanner.setRange(new Range(new Key(row), endKey));
+ for (Entry<Key,Value> entry : mdScanner) {
+
+ if (!entry.getKey().getRow().equals(row))
+ break;
+ DataFileValue dfv = new DataFileValue(entry.getValue().get());
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
+ }
+
+ return sizes;
+ }
+
+ public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, Credentials credentials, ZooLock zooLock) {
+ KeyExtent ke = new KeyExtent(metadataEntry, oldPrevEndRow);
+ Mutation m = ke.getPrevRowUpdateMutation();
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.putDelete(m);
+ TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.putDelete(m);
+ update(credentials, zooLock, m, new KeyExtent(metadataEntry, (Text) null));
+ }
+
+ public static void splitTablet(KeyExtent extent, Text oldPrevEndRow, double splitRatio, Credentials credentials, ZooLock zooLock) {
+ Mutation m = extent.getPrevRowUpdateMutation(); //
+
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value(Double.toString(splitRatio).getBytes()));
+
+ TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(oldPrevEndRow));
+ ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+ update(credentials, zooLock, m, extent);
+ }
+
+ public static void finishSplit(Text metadataEntry, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove, Credentials credentials,
+ ZooLock zooLock) {
+ Mutation m = new Mutation(metadataEntry);
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.putDelete(m);
+ TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.putDelete(m);
+ ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ m.put(DataFileColumnFamily.NAME, entry.getKey().meta(), new Value(entry.getValue().encode()));
+ }
+
+ for (FileRef pathToRemove : highDatafilesToRemove) {
+ m.putDelete(DataFileColumnFamily.NAME, pathToRemove.meta());
+ }
+
+ update(credentials, zooLock, m, new KeyExtent(metadataEntry, (Text) null));
+ }
+
+ public static void finishSplit(KeyExtent extent, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove, Credentials credentials,
+ ZooLock zooLock) {
+ finishSplit(extent.getMetadataEntry(), datafileSizes, highDatafilesToRemove, credentials, zooLock);
+ }
+
+ public static void addDeleteEntries(KeyExtent extent, Set<FileRef> datafilesToDelete, Credentials credentials) throws IOException {
+
+ String tableId = extent.getTableId().toString();
+
+ // TODO could use batch writer,would need to handle failure and retry like update does - ACCUMULO-1294
+ for (FileRef pathToRemove : datafilesToDelete) {
+ update(credentials, createDeleteMutation(tableId, pathToRemove.path().toString()), extent);
+ }
+ }
+
+ public static void addDeleteEntry(String tableId, String path) throws IOException {
+ update(SystemCredentials.get(), createDeleteMutation(tableId, path), new KeyExtent(new Text(tableId), null, null));
+ }
+
+ public static Mutation createDeleteMutation(String tableId, String pathToRemove) throws IOException {
+ if (!pathToRemove.contains(":")) {
+ if (pathToRemove.startsWith("../"))
+ pathToRemove = pathToRemove.substring(2);
+ else
+ pathToRemove = "/" + tableId + pathToRemove;
+ }
+
+ Path path = VolumeManagerImpl.get().getFullPath(FileType.TABLE, pathToRemove);
+ Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.getRowPrefix() + path.toString()));
+ delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
+ return delFlag;
+ }
+
+ public static void removeScanFiles(KeyExtent extent, Set<FileRef> scanFiles, Credentials credentials, ZooLock zooLock) {
+ Mutation m = new Mutation(extent.getMetadataEntry());
+
+ for (FileRef pathToRemove : scanFiles)
+ m.putDelete(ScanFileColumnFamily.NAME, pathToRemove.meta());
+
+ update(credentials, zooLock, m, extent);
+ }
+
+ public static void splitDatafiles(Text table, Text midRow, double splitRatio, Map<FileRef,FileUtil.FileInfo> firstAndLastRows,
+ SortedMap<FileRef,DataFileValue> datafiles, SortedMap<FileRef,DataFileValue> lowDatafileSizes, SortedMap<FileRef,DataFileValue> highDatafileSizes,
+ List<FileRef> highDatafilesToRemove) {
+
+ for (Entry<FileRef,DataFileValue> entry : datafiles.entrySet()) {
+
+ Text firstRow = null;
+ Text lastRow = null;
+
+ boolean rowsKnown = false;
+
+ FileUtil.FileInfo mfi = firstAndLastRows.get(entry.getKey());
+
+ if (mfi != null) {
+ firstRow = mfi.getFirstRow();
+ lastRow = mfi.getLastRow();
+ rowsKnown = true;
+ }
+
+ if (rowsKnown && firstRow.compareTo(midRow) > 0) {
+ // only in high
+ long highSize = entry.getValue().getSize();
+ long highEntries = entry.getValue().getNumEntries();
+ highDatafileSizes.put(entry.getKey(), new DataFileValue(highSize, highEntries, entry.getValue().getTime()));
+ } else if (rowsKnown && lastRow.compareTo(midRow) <= 0) {
+ // only in low
+ long lowSize = entry.getValue().getSize();
+ long lowEntries = entry.getValue().getNumEntries();
+ lowDatafileSizes.put(entry.getKey(), new DataFileValue(lowSize, lowEntries, entry.getValue().getTime()));
+
+ highDatafilesToRemove.add(entry.getKey());
+ } else {
+ long lowSize = (long) Math.floor((entry.getValue().getSize() * splitRatio));
+ long lowEntries = (long) Math.floor((entry.getValue().getNumEntries() * splitRatio));
+ lowDatafileSizes.put(entry.getKey(), new DataFileValue(lowSize, lowEntries, entry.getValue().getTime()));
+
+ long highSize = (long) Math.ceil((entry.getValue().getSize() * (1.0 - splitRatio)));
+ long highEntries = (long) Math.ceil((entry.getValue().getNumEntries() * (1.0 - splitRatio)));
+ highDatafileSizes.put(entry.getKey(), new DataFileValue(highSize, highEntries, entry.getValue().getTime()));
+ }
+ }
+ }
+
+ public static void deleteTable(String tableId, boolean insertDeletes, Credentials credentials, ZooLock lock) throws AccumuloException, IOException {
+ Scanner ms = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, MetadataTable.ID, Authorizations.EMPTY);
+ Text tableIdText = new Text(tableId);
+ BatchWriter bw = new BatchWriterImpl(HdfsZooInstance.getInstance(), credentials, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000)
+ .setMaxLatency(120000l, TimeUnit.MILLISECONDS).setMaxWriteThreads(2));
+
+ // scan metadata for our table and delete everything we find
+ Mutation m = null;
+ ms.setRange(new KeyExtent(tableIdText, null, null).toMetadataRange());
+
+ // insert deletes before deleting data from !METADATA... this makes the code fault tolerant
+ if (insertDeletes) {
+
+ ms.fetchColumnFamily(DataFileColumnFamily.NAME);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms);
+
+ for (Entry<Key,Value> cell : ms) {
+ Key key = cell.getKey();
+
+ if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ FileRef ref = new FileRef(VolumeManagerImpl.get(), key);
+ bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
+ }
+
+ if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+ bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString()));
+ }
+ }
+
+ bw.flush();
+
+ ms.clearColumns();
+ }
+
+ for (Entry<Key,Value> cell : ms) {
+ Key key = cell.getKey();
+
+ if (m == null) {
+ m = new Mutation(key.getRow());
+ if (lock != null)
+ putLockID(lock, m);
+ }
+
+ if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) {
+ bw.addMutation(m);
+ m = new Mutation(key.getRow());
+ if (lock != null)
+ putLockID(lock, m);
+ }
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+ }
+
+ if (m != null)
+ bw.addMutation(m);
+
+ bw.close();
+ }
+
+ public static class LogEntry {
+ public KeyExtent extent;
+ public long timestamp;
+ public String server;
+ public String filename;
+ public int tabletId;
+ public Collection<String> logSet;
+
+ @Override
+ public String toString() {
+ return extent.toString() + " " + filename + " (" + tabletId + ")";
+ }
+
+ public String getName() {
+ return server + "/" + filename;
+ }
+
+ public byte[] toBytes() throws IOException {
+ DataOutputBuffer out = new DataOutputBuffer();
+ extent.write(out);
+ out.writeLong(timestamp);
+ out.writeUTF(server);
+ out.writeUTF(filename.toString());
+ out.write(tabletId);
+ out.write(logSet.size());
+ for (String s : logSet) {
+ out.writeUTF(s);
+ }
+ return Arrays.copyOf(out.getData(), out.getLength());
+ }
+
+ public void fromBytes(byte bytes[]) throws IOException {
+ DataInputBuffer inp = new DataInputBuffer();
+ inp.reset(bytes, bytes.length);
+ extent = new KeyExtent();
+ extent.readFields(inp);
+ timestamp = inp.readLong();
+ server = inp.readUTF();
+ filename = inp.readUTF();
+ tabletId = inp.read();
+ int count = inp.read();
+ ArrayList<String> logSet = new ArrayList<String>(count);
+ for (int i = 0; i < count; i++)
+ logSet.add(inp.readUTF());
+ this.logSet = logSet;
+ }
+
+ }
+
+ static String getZookeeperLogLocation() {
+ return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
+ }
+
+ public static void addLogEntry(Credentials credentials, LogEntry entry, ZooLock zooLock) {
+ if (entry.extent.isRootTablet()) {
+ String root = getZookeeperLogLocation();
+ while (true) {
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.isLockHeld(zooLock.getLockID())) {
+ String[] parts = entry.filename.split("/");
+ String uniqueId = parts[parts.length - 1];
+ zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+ }
+ break;
+ } catch (KeeperException e) {
+ log.error(e, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ UtilWaitThread.sleep(1000);
+ }
+ } else {
+ String value = StringUtil.join(entry.logSet, ";") + "|" + entry.tabletId;
+ Mutation m = new Mutation(entry.extent.getMetadataEntry());
+ m.put(LogColumnFamily.NAME, new Text(entry.server + "/" + entry.filename), new Value(value.getBytes()));
+ update(credentials, zooLock, m, entry.extent);
+ }
+ }
+
+ public static LogEntry entryFromKeyValue(Key key, Value value) {
+ MetadataTableUtil.LogEntry e = new MetadataTableUtil.LogEntry();
+ e.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
+ String[] parts = key.getColumnQualifier().toString().split("/", 2);
+ e.server = parts[0];
+ e.filename = parts[1];
+ parts = value.toString().split("\\|");
+ e.tabletId = Integer.parseInt(parts[1]);
+ e.logSet = Arrays.asList(parts[0].split(";"));
+ e.timestamp = key.getTimestamp();
+ return e;
+ }
+
+ public static String getRootTabletDir() throws IOException {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
+ try {
+ return new String(zoo.getData(zpath, null), Constants.UTF8);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>> getFileAndLogEntries(Credentials credentials, KeyExtent extent) throws KeeperException,
+ InterruptedException, IOException {
+ ArrayList<LogEntry> result = new ArrayList<LogEntry>();
+ TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ if (extent.isRootTablet()) {
+ getRootLogEntries(result);
+ Path rootDir = new Path(getRootTabletDir());
+ FileStatus[] files = fs.listStatus(rootDir);
+ for (FileStatus fileStatus : files) {
+ if (fileStatus.getPath().toString().endsWith("_tmp")) {
+ continue;
+ }
+ DataFileValue dfv = new DataFileValue(0, 0);
+ sizes.put(new FileRef(fileStatus.getPath().toString(), fileStatus.getPath()), dfv);
+ }
+
+ } else {
+ String systemTableToCheck = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
+ Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, systemTableToCheck, Authorizations.EMPTY);
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner.setRange(extent.toMetadataRange());
+
+ for (Entry<Key,Value> entry : scanner) {
+ if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) {
+ throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry());
+ }
+
+ if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
+ result.add(entryFromKeyValue(entry.getKey(), entry.getValue()));
+ } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ DataFileValue dfv = new DataFileValue(entry.getValue().get());
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
+ } else {
+ throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
+ }
+ }
+ }
+
+ return new Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>>(result, sizes);
+ }
+
+ public static List<LogEntry> getLogEntries(Credentials credentials, KeyExtent extent) throws IOException, KeeperException, InterruptedException {
+ log.info("Scanning logging entries for " + extent);
+ ArrayList<LogEntry> result = new ArrayList<LogEntry>();
+ if (extent.equals(RootTable.EXTENT)) {
+ log.info("Getting logs for root tablet from zookeeper");
+ getRootLogEntries(result);
+ } else {
+ log.info("Scanning metadata for logs used for tablet " + extent);
+ Scanner scanner = getTabletLogScanner(credentials, extent);
+ Text pattern = extent.getMetadataEntry();
+ for (Entry<Key,Value> entry : scanner) {
+ Text row = entry.getKey().getRow();
+ if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
+ if (row.equals(pattern)) {
+ result.add(entryFromKeyValue(entry.getKey(), entry.getValue()));
+ }
+ }
+ }
+ }
+
+ Collections.sort(result, new Comparator<LogEntry>() {
+ @Override
+ public int compare(LogEntry o1, LogEntry o2) {
+ long diff = o1.timestamp - o2.timestamp;
+ if (diff < 0)
+ return -1;
+ if (diff > 0)
+ return 1;
+ return 0;
+ }
+ });
+ log.info("Returning logs " + result + " for extent " + extent);
+ return result;
+ }
+
+ private static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ String root = getZookeeperLogLocation();
+ // there's a little race between getting the children and fetching
+ // the data. The log can be removed in between.
+ while (true) {
+ result.clear();
+ for (String child : zoo.getChildren(root)) {
+ LogEntry e = new LogEntry();
+ try {
+ e.fromBytes(zoo.getData(root + "/" + child, null));
+ // upgrade from !0;!0<< -> !!R<<
+ e.extent = RootTable.EXTENT;
+ result.add(e);
+ } catch (KeeperException.NoNodeException ex) {
+ continue;
+ }
+ }
+ break;
+ }
+ }
+
+ private static Scanner getTabletLogScanner(Credentials credentials, KeyExtent extent) {
+ String tableId = MetadataTable.ID;
+ if (extent.isMeta())
+ tableId = RootTable.ID;
+ Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, tableId, Authorizations.EMPTY);
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ Text start = extent.getMetadataEntry();
+ Key endKey = new Key(start, LogColumnFamily.NAME);
+ endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
+ scanner.setRange(new Range(new Key(start), endKey));
+ return scanner;
+ }
+
+ static class LogEntryIterator implements Iterator<LogEntry> {
+
+ Iterator<LogEntry> zookeeperEntries = null;
+ Iterator<LogEntry> rootTableEntries = null;
+ Iterator<Entry<Key,Value>> metadataEntries = null;
+
+ LogEntryIterator(Credentials creds) throws IOException, KeeperException, InterruptedException {
+ zookeeperEntries = getLogEntries(creds, RootTable.EXTENT).iterator();
+ rootTableEntries = getLogEntries(creds, new KeyExtent(new Text(MetadataTable.ID), null, null)).iterator();
+ try {
+ Scanner scanner = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken())
+ .createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ log.info("Setting range to " + MetadataSchema.TabletsSection.getRange());
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ metadataEntries = scanner.iterator();
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return zookeeperEntries.hasNext() || rootTableEntries.hasNext() || metadataEntries.hasNext();
+ }
+
+ @Override
+ public LogEntry next() {
+ if (zookeeperEntries.hasNext()) {
+ return zookeeperEntries.next();
+ }
+ if (rootTableEntries.hasNext()) {
+ return rootTableEntries.next();
+ }
+ Entry<Key,Value> entry = metadataEntries.next();
+ return entryFromKeyValue(entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static Iterator<LogEntry> getLogEntries(Credentials creds) throws IOException, KeeperException, InterruptedException {
+ return new LogEntryIterator(creds);
+ }
+
+ public static void removeUnusedWALEntries(KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
+ for (LogEntry entry : logEntries) {
+ if (entry.extent.isRootTablet()) {
+ String root = getZookeeperLogLocation();
+ while (true) {
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.isLockHeld(zooLock.getLockID()))
+ zoo.recursiveDelete(root + "/" + entry.filename, NodeMissingPolicy.SKIP);
+ break;
+ } catch (Exception e) {
+ log.error(e, e);
+ }
+ UtilWaitThread.sleep(1000);
+ }
+ } else {
+ Mutation m = new Mutation(entry.extent.getMetadataEntry());
+ m.putDelete(LogColumnFamily.NAME, new Text(entry.server + "/" + entry.filename));
+ update(SystemCredentials.get(), zooLock, m, entry.extent);
+ }
+ }
+ }
+
+ private static void getFiles(Set<String> files, Map<Key,Value> tablet, String srcTableId) {
+ for (Entry<Key,Value> entry : tablet.entrySet()) {
+ if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ String cf = entry.getKey().getColumnQualifier().toString();
+ if (srcTableId != null && !cf.startsWith("../") && !cf.contains(":")) {
+ cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
+ }
+ files.add(cf);
+ }
+ }
+ }
+
+ private static Mutation createCloneMutation(String srcTableId, String tableId, Map<Key,Value> tablet) {
+
+ KeyExtent ke = new KeyExtent(tablet.keySet().iterator().next().getRow(), (Text) null);
+ Mutation m = new Mutation(KeyExtent.getMetadataEntry(new Text(tableId), ke.getEndRow()));
+
+ for (Entry<Key,Value> entry : tablet.entrySet()) {
+ if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ String cf = entry.getKey().getColumnQualifier().toString();
+ if (!cf.startsWith("../") && !cf.contains(":"))
+ cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
+ m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());
+ } else if (entry.getKey().getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
+ m.put(TabletsSection.LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue());
+ } else if (entry.getKey().getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
+ // skip
+ } else {
+ m.put(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(), entry.getValue());
+ }
+ }
+ return m;
+ }
+
+ private static Scanner createCloneScanner(String tableId, Connector conn) throws TableNotFoundException {
+ Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
+ mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ mscanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ mscanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME);
+ mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(mscanner);
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(mscanner);
+ return mscanner;
+ }
+
+ static void initializeClone(String srcTableId, String tableId, Connector conn, BatchWriter bw) throws TableNotFoundException, MutationsRejectedException {
+ TabletIterator ti = new TabletIterator(createCloneScanner(srcTableId, conn), new KeyExtent(new Text(srcTableId), null, null).toMetadataRange(), true, true);
+
+ if (!ti.hasNext())
+ throw new RuntimeException(" table deleted during clone? srcTableId = " + srcTableId);
+
+ while (ti.hasNext())
+ bw.addMutation(createCloneMutation(srcTableId, tableId, ti.next()));
+
+ bw.flush();
+ }
+
+ static int compareEndRows(Text endRow1, Text endRow2) {
+ return new KeyExtent(new Text("0"), endRow1, null).compareTo(new KeyExtent(new Text("0"), endRow2, null));
+ }
+
+ static int checkClone(String srcTableId, String tableId, Connector conn, BatchWriter bw) throws TableNotFoundException, MutationsRejectedException {
+ TabletIterator srcIter = new TabletIterator(createCloneScanner(srcTableId, conn), new KeyExtent(new Text(srcTableId), null, null).toMetadataRange(), true,
+ true);
+ TabletIterator cloneIter = new TabletIterator(createCloneScanner(tableId, conn), new KeyExtent(new Text(tableId), null, null).toMetadataRange(), true, true);
+
+ if (!cloneIter.hasNext() || !srcIter.hasNext())
+ throw new RuntimeException(" table deleted during clone? srcTableId = " + srcTableId + " tableId=" + tableId);
+
+ int rewrites = 0;
+
+ while (cloneIter.hasNext()) {
+ Map<Key,Value> cloneTablet = cloneIter.next();
+ Text cloneEndRow = new KeyExtent(cloneTablet.keySet().iterator().next().getRow(), (Text) null).getEndRow();
+ HashSet<String> cloneFiles = new HashSet<String>();
+
+ boolean cloneSuccessful = false;
+ for (Entry<Key,Value> entry : cloneTablet.entrySet()) {
+ if (entry.getKey().getColumnFamily().equals(ClonedColumnFamily.NAME)) {
+ cloneSuccessful = true;
+ break;
+ }
+ }
+
+ if (!cloneSuccessful)
+ getFiles(cloneFiles, cloneTablet, null);
+
+ List<Map<Key,Value>> srcTablets = new ArrayList<Map<Key,Value>>();
+ Map<Key,Value> srcTablet = srcIter.next();
+ srcTablets.add(srcTablet);
+
+ Text srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null).getEndRow();
+
+ int cmp = compareEndRows(cloneEndRow, srcEndRow);
+ if (cmp < 0)
+ throw new TabletIterator.TabletDeletedException("Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow);
+
+ HashSet<String> srcFiles = new HashSet<String>();
+ if (!cloneSuccessful)
+ getFiles(srcFiles, srcTablet, srcTableId);
+
+ while (cmp > 0) {
+ srcTablet = srcIter.next();
+ srcTablets.add(srcTablet);
+ srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null).getEndRow();
+ cmp = compareEndRows(cloneEndRow, srcEndRow);
+ if (cmp < 0)
+ throw new TabletIterator.TabletDeletedException("Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow);
+
+ if (!cloneSuccessful)
+ getFiles(srcFiles, srcTablet, srcTableId);
+ }
+
+ if (cloneSuccessful)
+ continue;
+
+ if (!srcFiles.containsAll(cloneFiles)) {
+ // delete existing cloned tablet entry
+ Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow());
+
+ for (Entry<Key,Value> entry : cloneTablet.entrySet()) {
+ Key k = entry.getKey();
+ m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp());
+ }
+
+ bw.addMutation(m);
+
+ for (Map<Key,Value> st : srcTablets)
+ bw.addMutation(createCloneMutation(srcTableId, tableId, st));
+
+ rewrites++;
+ } else {
+ // write out marker that this tablet was successfully cloned
+ Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow());
+ m.put(ClonedColumnFamily.NAME, new Text(""), new Value("OK".getBytes()));
+ bw.addMutation(m);
+ }
+ }
+
+ bw.flush();
+ return rewrites;
+ }
+
+ public static void cloneTable(Instance instance, String srcTableId, String tableId, VolumeManager volumeManager) throws Exception {
+
+ Connector conn = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ while (true) {
+
+ try {
+ initializeClone(srcTableId, tableId, conn, bw);
+
+ // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed
+
+ while (true) {
+ int rewrites = checkClone(srcTableId, tableId, conn, bw);
+
+ if (rewrites == 0)
+ break;
+ }
+
+ bw.flush();
+ break;
+
+ } catch (TabletIterator.TabletDeletedException tde) {
+ // tablets were merged in the src table
+ bw.flush();
+
+ // delete what we have cloned and try again
+ deleteTable(tableId, false, SystemCredentials.get(), null);
+
+ log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again");
+
+ UtilWaitThread.sleep(100);
+ }
+ }
+
+ // delete the clone markers and create directory entries
+ Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
+
+ int dirCount = 0;
+
+ for (Entry<Key,Value> entry : mscanner) {
+ Key k = entry.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.putDelete(k.getColumnFamily(), k.getColumnQualifier());
+ String dir = volumeManager.choose(ServerConstants.getTablesDirs()) + "/" + tableId
+ + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, "/c-".getBytes()));
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes()));
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ }
+
+ public static void chopped(KeyExtent extent, ZooLock zooLock) {
+ Mutation m = new Mutation(extent.getMetadataEntry());
+ ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("chopped".getBytes()));
+ update(SystemCredentials.get(), zooLock, m, extent);
+ }
+
+ public static void removeBulkLoadEntries(Connector conn, String tableId, long tid) throws Exception {
+ Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
+ mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ for (Entry<Key,Value> entry : mscanner) {
+ log.debug("Looking at entry " + entry + " with tid " + tid);
+ if (Long.parseLong(entry.getValue().toString()) == tid) {
+ log.debug("deleting entry " + entry);
+ Mutation m = new Mutation(entry.getKey().getRow());
+ m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+ bw.addMutation(m);
+ }
+ }
+ bw.close();
+ }
+
+ public static List<FileRef> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException {
+ List<FileRef> result = new ArrayList<FileRef>();
+ try {
+ VolumeManager fs = VolumeManagerImpl.get();
+ Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY));
+ mscanner.setRange(extent.toMetadataRange());
+ mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : mscanner) {
+ if (Long.parseLong(entry.getValue().toString()) == tid) {
+ result.add(new FileRef(fs, entry.getKey()));
+ }
+ }
+ return result;
+ } catch (TableNotFoundException ex) {
+ // unlikely
+ throw new RuntimeException("Onos! teh metadata table has vanished!!");
+ }
+ }
+
+ public static Map<FileRef,Long> getBulkFilesLoaded(Credentials credentials, KeyExtent extent) throws IOException {
+ Text metadataRow = extent.getMetadataEntry();
+ Map<FileRef,Long> ret = new HashMap<FileRef,Long>();
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY);
+ scanner.setRange(new Range(metadataRow));
+ scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : scanner) {
+ Long tid = Long.parseLong(entry.getValue().toString());
+ ret.put(new FileRef(fs, entry.getKey()), tid);
+ }
+ return ret;
+ }
+
+ public static void addBulkLoadInProgressFlag(String path) {
+
+ Mutation m = new Mutation(MetadataSchema.BlipSection.getRowPrefix() + path);
+ m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
+
+ // new KeyExtent is only added to force update to write to the metadata table, not the root table
+ // because bulk loads aren't supported to the metadata table
+ update(SystemCredentials.get(), m, new KeyExtent(new Text("anythingNotMetadata"), null, null));
+ }
+
+ public static void removeBulkLoadInProgressFlag(String path) {
+
+ Mutation m = new Mutation(MetadataSchema.BlipSection.getRowPrefix() + path);
+ m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
+
+ // new KeyExtent is only added to force update to write to the metadata table, not the root table
+ // because bulk loads aren't supported to the metadata table
+ update(SystemCredentials.get(), m, new KeyExtent(new Text("anythingNotMetadata"), null, null));
+ }
+
+ public static void moveMetaDeleteMarkers(Instance instance, Credentials creds) {
+ // move old delete markers to new location, to standardize table schema between all metadata tables
+ byte[] EMPTY_BYTES = new byte[0];
+ Scanner scanner = new ScannerImpl(instance, creds, RootTable.ID, Authorizations.EMPTY);
+ String oldDeletesPrefix = "!!~del";
+ Range oldDeletesRange = new Range(oldDeletesPrefix, true, "!!~dem", false);
+ scanner.setRange(oldDeletesRange);
+ for (Entry<Key,Value> entry : scanner) {
+ String row = entry.getKey().getRow().toString();
+ if (row.startsWith(oldDeletesPrefix)) {
+ String filename = row.substring(oldDeletesPrefix.length());
+ // add the new entry first
+ log.info("Moving " + filename + " marker in " + RootTable.NAME);
+ Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + filename);
+ m.put(EMPTY_BYTES, EMPTY_BYTES, EMPTY_BYTES);
+ update(creds, m, RootTable.EXTENT);
+ // remove the old entry
+ m = new Mutation(entry.getKey().getRow());
+ m.putDelete(EMPTY_BYTES, EMPTY_BYTES);
+ update(creds, m, RootTable.OLD_EXTENT);
+ } else {
+ break;
+ }
+ }
+
+ }
+
+ public static SortedMap<Text,SortedMap<ColumnFQ,Value>> getTabletEntries(SortedMap<Key,Value> tabletKeyValues, List<ColumnFQ> columns) {
+ TreeMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries = new TreeMap<Text,SortedMap<ColumnFQ,Value>>();
+
+ HashSet<ColumnFQ> colSet = null;
+ if (columns != null) {
+ colSet = new HashSet<ColumnFQ>(columns);
+ }
+
+ for (Entry<Key,Value> entry : tabletKeyValues.entrySet()) {
+
+ if (columns != null && !colSet.contains(new ColumnFQ(entry.getKey()))) {
+ continue;
+ }
+
+ Text row = entry.getKey().getRow();
+
+ SortedMap<ColumnFQ,Value> colVals = tabletEntries.get(row);
+ if (colVals == null) {
+ colVals = new TreeMap<ColumnFQ,Value>();
+ tabletEntries.put(row, colVals);
+ }
+
+ colVals.put(new ColumnFQ(entry.getKey()), entry.getValue());
+ }
+
+ return tabletEntries;
+ }
+
+ public static void convertRootTabletToRootTable(Instance instance, SystemCredentials systemCredentials) throws KeeperException, InterruptedException {
+ ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.exists(ZooUtil.getRoot(instance) + "/tables/" + RootTable.ID))
+ return;
+ TableManager.prepareNewTableState(instance.getInstanceID(), RootTable.ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java b/server/base/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
new file mode 100644
index 0000000..cefec22
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ScannerOptions;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.util.MetadataTableUtil.LogEntry;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class OfflineMetadataScanner extends ScannerOptions implements Scanner {
+
+ private Set<String> allFiles = new HashSet<String>();
+ private Range range = new Range();
+ private final VolumeManager fs;
+ private final AccumuloConfiguration conf;
+
+ private List<SortedKeyValueIterator<Key,Value>> openMapFiles(Collection<String> files, VolumeManager fs, AccumuloConfiguration conf) throws IOException {
+ List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+ for (String file : files) {
+ FileSystem ns = fs.getFileSystemByPath(new Path(file));
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file, true, ns, ns.getConf(), conf);
+ readers.add(reader);
+ }
+ return readers;
+ }
+
+ private SortedKeyValueIterator<Key,Value> createSystemIter(Range r, List<SortedKeyValueIterator<Key,Value>> readers, HashSet<Column> columns)
+ throws IOException {
+ MultiIterator multiIterator = new MultiIterator(readers, false);
+ DeletingIterator delIter = new DeletingIterator(multiIterator, false);
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+ ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, columns);
+ VisibilityFilter visFilter = new VisibilityFilter(colFilter, Authorizations.EMPTY, new byte[0]);
+
+ visFilter.seek(r, LocalityGroupUtil.EMPTY_CF_SET, false);
+
+ VersioningIterator vi = new VersioningIterator();
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put("maxVersions", "1");
+ vi.init(visFilter, opts, null);
+
+ return vi;
+ }
+
+ private static class MyEntry implements Map.Entry<Key,Value> {
+
+ private Key k;
+ private Value v;
+
+ MyEntry(Key k, Value v) {
+ this.k = k;
+ this.v = v;
+ }
+
+ @Override
+ public Key getKey() {
+ return k;
+ }
+
+ @Override
+ public Value getValue() {
+ return v;
+ }
+
+ @Override
+ public Value setValue(Value value) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public OfflineMetadataScanner(AccumuloConfiguration conf, VolumeManager fs) throws IOException {
+ super();
+ this.fs = fs;
+ this.conf = conf;
+ List<LogEntry> rwal;
+ try {
+ rwal = MetadataTableUtil.getLogEntries(null, RootTable.EXTENT);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check if root tablet has write ahead log entries", e);
+ }
+
+ if (rwal.size() > 0) {
+ throw new RuntimeException("Root tablet has write ahead logs, can not scan offline");
+ }
+
+ FileStatus[] rootFiles = fs.listStatus(new Path(MetadataTableUtil.getRootTabletDir()));
+
+ for (FileStatus rootFile : rootFiles) {
+ allFiles.add(rootFile.getPath().toString());
+ }
+
+ List<SortedKeyValueIterator<Key,Value>> readers = openMapFiles(allFiles, fs, conf);
+
+ HashSet<Column> columns = new HashSet<Column>();
+ columns.add(new Column(TextUtil.getBytes(DataFileColumnFamily.NAME), null, null));
+ columns.add(new Column(TextUtil.getBytes(LogColumnFamily.NAME), null, null));
+
+ SortedKeyValueIterator<Key,Value> ssi = createSystemIter(new Range(), readers, columns);
+
+ int walogs = 0;
+
+ while (ssi.hasTop()) {
+ if (ssi.getTopKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
+ allFiles.add(fs.getFullPath(ssi.getTopKey()).toString());
+ } else {
+ walogs++;
+ }
+ ssi.next();
+ }
+
+ closeReaders(readers);
+
+ if (walogs > 0) {
+ throw new RuntimeException("Metadata tablets have write ahead logs, can not scan offline");
+ }
+ }
+
+ private void closeReaders(List<SortedKeyValueIterator<Key,Value>> readers) throws IOException {
+ for (SortedKeyValueIterator<Key,Value> reader : readers) {
+ ((FileSKVIterator) reader).close();
+ }
+ }
+
+ @Override
+ public int getBatchSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Range getRange() {
+ return range;
+ }
+
+ @Deprecated
+ @Override
+ public int getTimeOut() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Entry<Key,Value>> iterator() {
+
+ final SortedKeyValueIterator<Key,Value> ssi;
+ final List<SortedKeyValueIterator<Key,Value>> readers;
+ try {
+ readers = openMapFiles(allFiles, fs, conf);
+ ssi = createSystemIter(range, readers, new HashSet<Column>(getFetchedColumns()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new Iterator<Entry<Key,Value>>() {
+
+ @Override
+ public boolean hasNext() {
+ return ssi.hasTop() && range.contains(ssi.getTopKey());
+ }
+
+ @Override
+ public Entry<Key,Value> next() {
+ if (!ssi.hasTop()) {
+ throw new NoSuchElementException();
+ }
+
+ MyEntry e = new MyEntry(new Key(ssi.getTopKey()), new Value(ssi.getTopValue()));
+ try {
+ ssi.next();
+ } catch (IOException e1) {
+ throw new RuntimeException(e1);
+ }
+ return e;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ @Override
+ public void setBatchSize(int size) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setRange(Range range) {
+ this.range = range;
+ }
+
+ @Deprecated
+ @Override
+ public void setTimeOut(int timeOut) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void enableIsolation() {
+ // should always give an isolated view since it is scanning immutable files
+ }
+
+ @Override
+ public void disableIsolation() {
+
+ }
+
+ public static void main(String[] args) throws IOException {
+ ServerConfiguration conf = new ServerConfiguration(HdfsZooInstance.getInstance());
+ VolumeManager fs = VolumeManagerImpl.get();
+ OfflineMetadataScanner scanner = new OfflineMetadataScanner(conf.getConfiguration(), fs);
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ for (Entry<Key,Value> entry : scanner)
+ System.out.println(entry.getKey() + " " + entry.getValue());
+ }
+
+ @Override
+ public long getReadaheadThreshold() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReadaheadThreshold(long batches) {
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/PortUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/PortUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/PortUtils.java
new file mode 100644
index 0000000..f710b0f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/PortUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Random;
+
+public class PortUtils {
+
+ public static int getRandomFreePort() {
+ Random r = new Random();
+ int count = 0;
+
+ while (count < 13) {
+ int port = r.nextInt((1 << 16) - 1024) + 1024;
+
+ ServerSocket so = null;
+ try {
+ so = new ServerSocket(port);
+ so.setReuseAddress(true);
+ return port;
+ } catch (IOException ioe) {
+
+ } finally {
+ if (so != null)
+ try {
+ so.close();
+ } catch (IOException e) {}
+ }
+
+ }
+
+ throw new RuntimeException("Unable to find port");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java b/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java
new file mode 100644
index 0000000..0666297
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.cli.ClientOnDefaultTable;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+public class RandomWriter {
+
+ private static String table_name = "test_write_table";
+ private static int num_columns_per_row = 1;
+ private static int num_payload_bytes = 1024;
+ private static final Logger log = Logger.getLogger(RandomWriter.class);
+
+ public static class RandomMutationGenerator implements Iterable<Mutation>, Iterator<Mutation> {
+ private long max_mutations;
+ private int mutations_so_far = 0;
+ private Random r = new Random();
+ private static final Logger log = Logger.getLogger(RandomMutationGenerator.class);
+
+ public RandomMutationGenerator(long num_mutations) {
+ max_mutations = num_mutations;
+ }
+
+ public boolean hasNext() {
+ return mutations_so_far < max_mutations;
+ }
+
+ public Mutation next() {
+ Text row_value = new Text(Long.toString(((r.nextLong() & 0x7fffffffffffffffl) / 177) % 100000000000l));
+ Mutation m = new Mutation(row_value);
+ for (int column = 0; column < num_columns_per_row; column++) {
+ Text column_fam = new Text("col_fam");
+ byte[] bytes = new byte[num_payload_bytes];
+ r.nextBytes(bytes);
+ m.put(column_fam, new Text("" + column), new Value(bytes));
+ }
+ mutations_so_far++;
+ if (mutations_so_far % 1000000 == 0) {
+ log.info("Created " + mutations_so_far + " mutations so far");
+ }
+ return m;
+ }
+
+ public void remove() {
+ mutations_so_far++;
+ }
+
+ @Override
+ public Iterator<Mutation> iterator() {
+ return this;
+ }
+ }
+ static class Opts extends ClientOnDefaultTable {
+ @Parameter(names="--count", description="number of mutations to write", required=true)
+ long count;
+
+ Opts(String table) { super(table); }
+ }
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts(table_name);
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(RandomWriter.class.getName(), args, bwOpts);
+
+ long start = System.currentTimeMillis();
+ log.info("starting at " + start + " for user " + opts.principal);
+ try {
+ Connector connector = opts.getConnector();
+ BatchWriter bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+ log.info("Writing " + opts.count + " mutations...");
+ bw.addMutations(new RandomMutationGenerator(opts.count));
+ bw.close();
+ } catch (Exception e) {
+ log.error(e);
+ throw e;
+ }
+ long stop = System.currentTimeMillis();
+
+ log.info("stopping at " + stop);
+ log.info("elapsed: " + (((double) stop - (double) start) / 1000.0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
new file mode 100644
index 0000000..0b4f896
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Remove file entries for map files that don't exist.
+ *
+ */
+public class RemoveEntriesForMissingFiles {
+
+ static class Opts extends ClientOpts {
+ @Parameter(names = "--fix")
+ boolean fix = false;
+ }
+
+ private static class CheckFileTask implements Runnable {
+ @SuppressWarnings("rawtypes")
+ private Map cache;
+ private VolumeManager fs;
+ private AtomicInteger missing;
+ private BatchWriter writer;
+ private Key key;
+ private Path path;
+ private Set<Path> processing;
+ private AtomicReference<Exception> exceptionRef;
+
+ @SuppressWarnings({"rawtypes"})
+ CheckFileTask(Map cache, VolumeManager fs, AtomicInteger missing, BatchWriter writer, Key key, Path map, Set<Path> processing,
+ AtomicReference<Exception> exceptionRef) {
+ this.cache = cache;
+ this.fs = fs;
+ this.missing = missing;
+ this.writer = writer;
+ this.key = key;
+ this.path = map;
+ this.processing = processing;
+ this.exceptionRef = exceptionRef;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ try {
+ if (!fs.exists(path)) {
+ missing.incrementAndGet();
+
+ Mutation m = new Mutation(key.getRow());
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+ if (writer != null) {
+ writer.addMutation(m);
+ System.out.println("Reference " + path + " removed from " + key.getRow());
+ } else {
+ System.out.println("File " + path + " is missing");
+ }
+ } else {
+ synchronized (processing) {
+ cache.put(path, path);
+ }
+ }
+ } catch (Exception e) {
+ exceptionRef.compareAndSet(null, e);
+ } finally {
+ synchronized (processing) {
+ processing.remove(path);
+ processing.notify();
+ }
+ }
+ }
+ }
+
+ private static int checkTable(Instance instance, String principal, AuthenticationToken token, String table, Range range, boolean fix) throws Exception {
+
+ @SuppressWarnings({"rawtypes"})
+ Map cache = new LRUMap(100000);
+ Set<Path> processing = new HashSet<Path>();
+ ExecutorService threadPool = Executors.newFixedThreadPool(16);
+
+ System.out.printf("Scanning : %s %s\n", table, range);
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ Connector connector = instance.getConnector(principal, token);
+ Scanner metadata = connector.createScanner(table, Authorizations.EMPTY);
+ metadata.setRange(range);
+ metadata.fetchColumnFamily(DataFileColumnFamily.NAME);
+ int count = 0;
+ AtomicInteger missing = new AtomicInteger(0);
+ AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>(null);
+ BatchWriter writer = null;
+
+ if (fix)
+ writer = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ for (Entry<Key,Value> entry : metadata) {
+ if (exceptionRef.get() != null)
+ break;
+
+ count++;
+ Key key = entry.getKey();
+ Path map = fs.getFullPath(key);
+
+ synchronized (processing) {
+ while (processing.size() >= 64 || processing.contains(map))
+ processing.wait();
+
+ if (cache.get(map) != null) {
+ continue;
+ }
+
+ processing.add(map);
+ }
+
+ threadPool.submit(new CheckFileTask(cache, fs, missing, writer, key, map, processing, exceptionRef));
+ }
+
+ threadPool.shutdown();
+
+ synchronized (processing) {
+ while (processing.size() > 0)
+ processing.wait();
+ }
+
+ if (exceptionRef.get() != null)
+ throw new AccumuloException(exceptionRef.get());
+
+ if (writer != null && missing.get() > 0)
+ writer.close();
+
+ System.out.printf("Scan finished, %d files of %d missing\n\n", missing.get(), count);
+
+ return missing.get();
+ }
+
+ static int checkAllTables(Instance instance, String principal, AuthenticationToken token, boolean fix) throws Exception {
+ int missing = checkTable(instance, principal, token, RootTable.NAME, MetadataSchema.TabletsSection.getRange(), fix);
+
+ if (missing == 0)
+ return checkTable(instance, principal, token, MetadataTable.NAME, MetadataSchema.TabletsSection.getRange(), fix);
+ else
+ return missing;
+ }
+
+ static int checkTable(Instance instance, String principal, AuthenticationToken token, String tableName, boolean fix) throws Exception {
+ if (tableName.equals(RootTable.NAME)) {
+ throw new IllegalArgumentException("Can not check root table");
+ } else if (tableName.equals(MetadataTable.NAME)) {
+ return checkTable(instance, principal, token, RootTable.NAME, MetadataSchema.TabletsSection.getRange(), fix);
+ } else {
+ String tableId = Tables.getTableId(instance, tableName);
+ Range range = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
+ return checkTable(instance, principal, token, MetadataTable.NAME, range, fix);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(RemoveEntriesForMissingFiles.class.getName(), args, scanOpts, bwOpts);
+
+ checkAllTables(opts.getInstance(), opts.principal, opts.getToken(), opts.fix);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/RestoreZookeeper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RestoreZookeeper.java b/server/base/src/main/java/org/apache/accumulo/server/util/RestoreZookeeper.java
new file mode 100644
index 0000000..6e5607e
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RestoreZookeeper.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Stack;
+
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+import com.beust.jcommander.Parameter;
+
+public class RestoreZookeeper {
+
+ private static class Restore extends DefaultHandler {
+ IZooReaderWriter zk = null;
+ Stack<String> cwd = new Stack<String>();
+ boolean overwrite = false;
+
+ Restore(IZooReaderWriter zk, boolean overwrite) {
+ this.zk = zk;
+ this.overwrite = overwrite;
+ }
+
+ @Override
+ public void startElement(String uri, String localName, String name, Attributes attributes) throws SAXException {
+ if ("node".equals(name)) {
+ String child = attributes.getValue("name");
+ if (child == null)
+ throw new RuntimeException("name attribute not set");
+ String encoding = attributes.getValue("encoding");
+ String value = attributes.getValue("value");
+ if (value == null)
+ value = "";
+ String path = cwd.lastElement() + "/" + child;
+ create(path, value, encoding);
+ cwd.push(path);
+ } else if ("dump".equals(name)) {
+ String root = attributes.getValue("root");
+ if (root.equals("/"))
+ cwd.push("");
+ else
+ cwd.push(root);
+ create(root, "", Constants.UTF8.name());
+ }
+ }
+
+ @Override
+ public void endElement(String uri, String localName, String name) throws SAXException {
+ cwd.pop();
+ }
+
+ // assume UTF-8 if not "base64"
+ private void create(String path, String value, String encoding) {
+ byte[] data = value.getBytes(Constants.UTF8);
+ if ("base64".equals(encoding))
+ data = Base64.decodeBase64(data);
+ try {
+ try {
+ zk.putPersistentData(path, data, overwrite ? NodeExistsPolicy.OVERWRITE : NodeExistsPolicy.FAIL);
+ } catch (KeeperException e) {
+ if (e.code().equals(KeeperException.Code.NODEEXISTS))
+ throw new RuntimeException(path + " exists. Remove it first.");
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static class Opts extends Help {
+ @Parameter(names = {"-z", "--keepers"})
+ String keepers = "localhost:2181";
+ @Parameter(names = "--overwrite")
+ boolean overwrite = false;
+ @Parameter(names = "--file")
+ String file;
+ }
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ Logger.getRootLogger().setLevel(Level.WARN);
+ Opts opts = new Opts();
+ opts.parseArgs(RestoreZookeeper.class.getName(), args);
+
+ InputStream in = System.in;
+ if (opts.file != null) {
+ in = new FileInputStream(opts.file);
+ }
+
+ SAXParserFactory factory = SAXParserFactory.newInstance();
+ SAXParser parser = factory.newSAXParser();
+ parser.parse(in, new Restore(ZooReaderWriter.getInstance(), opts.overwrite));
+ in.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java b/server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
new file mode 100644
index 0000000..d80adc9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.Socket;
+import java.net.URLEncoder;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.net.SocketFactory;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang.math.LongRange;
+import org.apache.log4j.Category;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.Filter;
+import org.apache.log4j.spi.LocationInfo;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.spi.ThrowableInformation;
+import org.apache.log4j.varia.LevelRangeFilter;
+import org.apache.log4j.xml.XMLLayout;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
+public class SendLogToChainsaw extends XMLLayout {
+
+ private static Pattern logPattern = Pattern.compile(
+ "^(\\d\\d)\\s(\\d\\d):(\\d\\d):(\\d\\d),(\\d\\d\\d)\\s\\[(.*)\\]\\s(TRACE|DEBUG|INFO|WARN|FATAL|ERROR)\\s*?:(.*)$", Pattern.UNIX_LINES);
+
+ private File[] logFiles = null;
+
+ private SocketFactory factory = SocketFactory.getDefault();
+
+ private WildcardFileFilter fileFilter = null;
+
+ private Socket socket = null;
+
+ private Pattern lineFilter = null;
+
+ private LongRange dateFilter = null;
+
+ private LevelRangeFilter levelFilter = null;
+
+ public SendLogToChainsaw(String directory, String fileNameFilter, String host, int port, Date start, Date end, String regex, String level) throws Exception {
+
+ // Set up the file name filter
+ if (null != fileNameFilter) {
+ fileFilter = new WildcardFileFilter(fileNameFilter);
+ } else {
+ fileFilter = new WildcardFileFilter("*");
+ }
+
+ // Get the list of files that match
+ File dir = new File(directory);
+ if (dir.isDirectory()) {
+ logFiles = dir.listFiles((FilenameFilter) fileFilter);
+ } else {
+ throw new IllegalArgumentException(directory + " is not a directory or is not readable.");
+ }
+
+ if (logFiles.length == 0) {
+ throw new IllegalArgumentException("No files match the supplied filter.");
+ }
+
+ socket = factory.createSocket(host, port);
+
+ lineFilter = Pattern.compile(regex);
+
+ // Create Date Filter
+ if (null != start) {
+ if (end == null)
+ end = new Date(System.currentTimeMillis());
+ dateFilter = new LongRange(start.getTime(), end.getTime());
+ }
+
+ if (null != level) {
+ Level base = Level.toLevel(level.toUpperCase());
+ levelFilter = new LevelRangeFilter();
+ levelFilter.setAcceptOnMatch(true);
+ levelFilter.setLevelMin(base);
+ levelFilter.setLevelMax(Level.FATAL);
+ }
+ }
+
+ public void processLogFiles() throws IOException {
+ String line = null;
+ String out = null;
+ FileReader fReader = null;
+ BufferedReader reader = null;
+ try {
+ for (File log : logFiles) {
+ // Parse the server type and name from the log file name
+ String threadName = log.getName().substring(0, log.getName().indexOf("."));
+ try {
+ fReader = new FileReader(log);
+ } catch (FileNotFoundException e) {
+ System.out.println("Unable to find file: " + log.getAbsolutePath());
+ throw e;
+ }
+ reader = new BufferedReader(fReader);
+
+ try {
+ line = reader.readLine();
+ while (null != line) {
+ out = convertLine(line, threadName);
+ if (null != out) {
+ if (socket != null && socket.isConnected())
+ socket.getOutputStream().write(out.getBytes());
+ else
+ System.err.println("Unable to send data to transport");
+ }
+ line = reader.readLine();
+ }
+ } catch (IOException e) {
+ System.out.println("Error processing line: " + line + ". Output was " + out);
+ throw e;
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ if (fReader != null) {
+ fReader.close();
+ }
+ }
+ }
+ } finally {
+ if (socket != null && socket.isConnected()) {
+ socket.close();
+ }
+ }
+ }
+
+ private String convertLine(String line, String threadName) throws UnsupportedEncodingException {
+ String result = null;
+ Matcher m = logPattern.matcher(line);
+ if (m.matches()) {
+
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(new Date(System.currentTimeMillis()));
+ Integer date = Integer.parseInt(m.group(1));
+ Integer hour = Integer.parseInt(m.group(2));
+ Integer min = Integer.parseInt(m.group(3));
+ Integer sec = Integer.parseInt(m.group(4));
+ Integer ms = Integer.parseInt(m.group(5));
+ String clazz = m.group(6);
+ String level = m.group(7);
+ String message = m.group(8);
+ // Apply the regex filter if supplied
+ if (null != lineFilter) {
+ Matcher match = lineFilter.matcher(message);
+ if (!match.matches())
+ return null;
+ }
+ // URL encode the message
+ message = URLEncoder.encode(message, Constants.UTF8.name());
+ // Assume that we are processing logs from today.
+ // If the date in the line is greater than today, then it must be
+ // from the previous month.
+ cal.set(Calendar.DATE, date);
+ cal.set(Calendar.HOUR_OF_DAY, hour);
+ cal.set(Calendar.MINUTE, min);
+ cal.set(Calendar.SECOND, sec);
+ cal.set(Calendar.MILLISECOND, ms);
+ if (date > cal.get(Calendar.DAY_OF_MONTH)) {
+ cal.add(Calendar.MONTH, -1);
+ }
+ long ts = cal.getTimeInMillis();
+ // If this event is not between the start and end dates, then skip it.
+ if (null != dateFilter && !dateFilter.containsLong(ts))
+ return null;
+ Category c = Logger.getLogger(clazz);
+ Level l = Level.toLevel(level);
+ LoggingEvent event = new LoggingEvent(clazz, c, ts, l, message, threadName, (ThrowableInformation) null, (String) null, (LocationInfo) null,
+ (Map<?,?>) null);
+ // Check the log level filter
+ if (null != levelFilter && (levelFilter.decide(event) == Filter.DENY)) {
+ return null;
+ }
+ result = format(event);
+ }
+ return result;
+ }
+
+ private static class DateConverter implements IStringConverter<Date> {
+ @Override
+ public Date convert(String value) {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
+ try {
+ return formatter.parse(value);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private static class Opts extends Help {
+
+ @Parameter(names = {"-d", "--logDirectory"}, description = "ACCUMULO log directory path", required = true)
+ String dir;
+
+ @Parameter(names = {"-f", "--fileFilter"}, description = "filter to apply to names of logs")
+ String filter;
+
+ @Parameter(names = {"-h", "--host"}, description = "host where chainsaw is running", required = true)
+ String hostname;
+
+ @Parameter(names = {"-p", "--port"}, description = "port where XMLSocketReceiver is listening", required = true)
+ int portnum;
+
+ @Parameter(names = {"-s", "--start"}, description = "start date filter (yyyyMMddHHmmss)", required = true, converter = DateConverter.class)
+ Date startDate;
+
+ @Parameter(names = {"-e", "--end"}, description = "end date filter (yyyyMMddHHmmss)", required = true, converter = DateConverter.class)
+ Date endDate;
+
+ @Parameter(names = {"-l", "--level"}, description = "filter log level")
+ String level;
+
+ @Parameter(names = {"-m", "--messageFilter"}, description = "regex filter for log messages")
+ String regex;
+ }
+
+ /**
+ *
+ * @param args
+ * <ol>
+ * <li>path to log directory</li>
+ * <li>filter to apply for logs to include (uses wildcards (i.e. logger* and IS case sensitive)</li>
+ * <li>chainsaw host</li>
+ * <li>chainsaw port</li>
+ * <li>start date filter</li>
+ * <li>end date filter</li>
+ * <li>optional regex filter to match on each log4j message</li>
+ * <li>optional level filter</li>
+ * </ol>
+ */
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(SendLogToChainsaw.class.getName(), args);
+
+ SendLogToChainsaw c = new SendLogToChainsaw(opts.dir, opts.filter, opts.hostname, opts.portnum, opts.startDate, opts.endDate, opts.regex, opts.level);
+ c.processLogFiles();
+ }
+
+}