You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:55:57 UTC
[18/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
deleted file mode 100644
index 6dd9d2c..0000000
--- a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.gc;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
-import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor;
-import org.apache.accumulo.core.gc.thrift.GCStatus;
-import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ServerServices.Service;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.server.Accumulo;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.ServerOpts;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.state.tables.TableManager;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TabletIterator;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.trace.instrument.CountSampler;
-import org.apache.accumulo.trace.instrument.Sampler;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
-import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-import com.beust.jcommander.Parameter;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.net.HostAndPort;
-
-public class SimpleGarbageCollector implements Iface {
- private static final Text EMPTY_TEXT = new Text();
-
- static class Opts extends ServerOpts {
- @Parameter(names = {"-v", "--verbose"}, description = "extra information will get printed to stdout also")
- boolean verbose = false;
- @Parameter(names = {"-s", "--safemode"}, description = "safe mode will not delete files")
- boolean safeMode = false;
- }
-
- // how much of the JVM's available memory should it use gathering candidates
- private static final float CANDIDATE_MEMORY_PERCENTAGE = 0.75f;
-
- private static final Logger log = Logger.getLogger(SimpleGarbageCollector.class);
-
- private Credentials credentials;
- private long gcStartDelay;
- private VolumeManager fs;
- private boolean useTrash = true;
- private Opts opts = new Opts();
- private ZooLock lock;
-
- private GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
-
- private int numDeleteThreads;
-
- private Instance instance;
-
- public static void main(String[] args) throws UnknownHostException, IOException {
- SecurityUtil.serverLogin();
- Instance instance = HdfsZooInstance.getInstance();
- ServerConfiguration serverConf = new ServerConfiguration(instance);
- final VolumeManager fs = VolumeManagerImpl.get();
- Accumulo.init(fs, serverConf, "gc");
- Opts opts = new Opts();
- opts.parseArgs("gc", args);
- SimpleGarbageCollector gc = new SimpleGarbageCollector(opts);
-
- gc.init(fs, instance, SystemCredentials.get(), serverConf.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE));
- Accumulo.enableTracing(opts.getAddress(), "gc");
- gc.run();
- }
-
- public SimpleGarbageCollector(Opts opts) {
- this.opts = opts;
- }
-
- public void init(VolumeManager fs, Instance instance, Credentials credentials, boolean noTrash) throws IOException {
- this.fs = fs;
- this.credentials = credentials;
- this.instance = instance;
-
- gcStartDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_START);
- long gcDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
- numDeleteThreads = instance.getConfiguration().getCount(Property.GC_DELETE_THREADS);
- log.info("start delay: " + gcStartDelay + " milliseconds");
- log.info("time delay: " + gcDelay + " milliseconds");
- log.info("safemode: " + opts.safeMode);
- log.info("verbose: " + opts.verbose);
- log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes");
- log.info("delete threads: " + numDeleteThreads);
- useTrash = !noTrash;
- }
-
- private class GCEnv implements GarbageCollectionEnvironment {
-
- private String tableName;
-
- GCEnv(String tableName) {
- this.tableName = tableName;
- }
-
- @Override
- public List<String> getCandidates(String continuePoint) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- // want to ensure GC makes progress... if the 1st N deletes are stable and we keep processing them,
- // then will never inspect deletes after N
- Range range = MetadataSchema.DeletesSection.getRange();
- if (continuePoint != null && !continuePoint.isEmpty()) {
- String continueRow = MetadataSchema.DeletesSection.getRowPrefix() + continuePoint;
- range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
- }
-
- Scanner scanner = instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName, Authorizations.EMPTY);
- scanner.setRange(range);
- List<String> result = new ArrayList<String>();
- // find candidates for deletion; chop off the prefix
- for (Entry<Key,Value> entry : scanner) {
- String cand = entry.getKey().getRow().toString().substring(MetadataSchema.DeletesSection.getRowPrefix().length());
- result.add(cand);
- if (almostOutOfMemory()) {
- log.info("List of delete candidates has exceeded the memory threshold. Attempting to delete what has been gathered so far.");
- break;
- }
- }
-
- return result;
-
- }
-
- @Override
- public Iterator<String> getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- IsolatedScanner scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName,
- Authorizations.EMPTY));
-
- scanner.setRange(MetadataSchema.BlipSection.getRange());
-
- return Iterators.transform(scanner.iterator(), new Function<Entry<Key,Value>,String>() {
- @Override
- public String apply(Entry<Key,Value> entry) {
- return entry.getKey().getRow().toString().substring(MetadataSchema.BlipSection.getRowPrefix().length());
- }
- });
- }
-
- @Override
- public Iterator<Entry<Key,Value>> getReferenceIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- IsolatedScanner scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName,
- Authorizations.EMPTY));
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.fetchColumnFamily(ScanFileColumnFamily.NAME);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- TabletIterator tabletIterator = new TabletIterator(scanner, MetadataSchema.TabletsSection.getRange(), false, true);
-
- return Iterators.concat(Iterators.transform(tabletIterator, new Function<Map<Key,Value>,Iterator<Entry<Key,Value>>>() {
- @Override
- public Iterator<Entry<Key,Value>> apply(Map<Key,Value> input) {
- return input.entrySet().iterator();
- }
- }));
- }
-
- @Override
- public Set<String> getTableIDs() {
- return Tables.getIdToNameMap(instance).keySet();
- }
-
- @Override
- public void delete(SortedMap<String,String> confirmedDeletes) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
- if (opts.safeMode) {
- if (opts.verbose)
- System.out.println("SAFEMODE: There are " + confirmedDeletes.size() + " data file candidates marked for deletion.%n"
- + " Examine the log files to identify them.%n");
- log.info("SAFEMODE: Listing all data file candidates for deletion");
- for (String s : confirmedDeletes.values())
- log.info("SAFEMODE: " + s);
- log.info("SAFEMODE: End candidates for deletion");
- return;
- }
-
- Connector c = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());
- BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig());
-
- // when deleting a dir and all files in that dir, only need to delete the dir
- // the dir will sort right before the files... so remove the files in this case
- // to minimize namenode ops
- Iterator<Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator();
-
- String lastDir = null;
- while (cdIter.hasNext()) {
- Entry<String,String> entry = cdIter.next();
- String relPath = entry.getKey();
- String absPath = fs.getFullPath(FileType.TABLE, entry.getValue()).toString();
-
- if (isDir(relPath)) {
- lastDir = absPath;
- } else if (lastDir != null) {
- if (absPath.startsWith(lastDir)) {
- log.debug("Ignoring " + entry.getValue() + " because " + lastDir + " exist");
- try {
- putMarkerDeleteMutation(entry.getValue(), writer);
- } catch (MutationsRejectedException e) {
- throw new RuntimeException(e);
- }
- cdIter.remove();
- } else {
- lastDir = null;
- }
- }
- }
-
- final BatchWriter finalWriter = writer;
-
- ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads, new NamingThreadFactory("deleting"));
-
- for (final String delete : confirmedDeletes.values()) {
-
- Runnable deleteTask = new Runnable() {
- @Override
- public void run() {
- boolean removeFlag;
-
- try {
- Path fullPath = fs.getFullPath(FileType.TABLE, delete);
-
- log.debug("Deleting " + fullPath);
-
- if (moveToTrash(fullPath) || fs.deleteRecursively(fullPath)) {
- // delete succeeded, still want to delete
- removeFlag = true;
- synchronized (SimpleGarbageCollector.this) {
- ++status.current.deleted;
- }
- } else if (fs.exists(fullPath)) {
- // leave the entry in the METADATA table; we'll try again
- // later
- removeFlag = false;
- synchronized (SimpleGarbageCollector.this) {
- ++status.current.errors;
- }
- log.warn("File exists, but was not deleted for an unknown reason: " + fullPath);
- } else {
- // this failure, we still want to remove the METADATA table
- // entry
- removeFlag = true;
- synchronized (SimpleGarbageCollector.this) {
- ++status.current.errors;
- }
- String parts[] = delete.split("/");
- if (parts.length > 2) {
- String tableId = parts[parts.length - 3];
- String tabletDir = parts[parts.length - 2];
- TableManager.getInstance().updateTableStateCache(tableId);
- TableState tableState = TableManager.getInstance().getTableState(tableId);
- if (tableState != null && tableState != TableState.DELETING) {
- // clone directories don't always exist
- if (!tabletDir.startsWith("c-"))
- log.warn("File doesn't exist: " + fullPath);
- }
- } else {
- log.warn("Very strange path name: " + delete);
- }
- }
-
- // proceed to clearing out the flags for successful deletes and
- // non-existent files
- if (removeFlag && finalWriter != null) {
- putMarkerDeleteMutation(delete, finalWriter);
- }
- } catch (Exception e) {
- log.error(e, e);
- }
-
- }
-
- };
-
- deleteThreadPool.execute(deleteTask);
- }
-
- deleteThreadPool.shutdown();
-
- try {
- while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {}
- } catch (InterruptedException e1) {
- log.error(e1, e1);
- }
-
- if (writer != null) {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- log.error("Problem removing entries from the metadata table: ", e);
- }
- }
- }
-
- @Override
- public void deleteTableDirIfEmpty(String tableID) throws IOException {
- // if dir exist and is empty, then empty list is returned...
- // hadoop 1.0 will return null if the file doesn't exist
- // hadoop 2.0 will throw an exception if the file does not exist
- for (String dir : ServerConstants.getTablesDirs()) {
- FileStatus[] tabletDirs = null;
- try {
- tabletDirs = fs.listStatus(new Path(dir + "/" + tableID));
- } catch (FileNotFoundException ex) {
- // ignored
- }
- if (tabletDirs == null)
- continue;
-
- if (tabletDirs.length == 0) {
- Path p = new Path(dir + "/" + tableID);
- log.debug("Removing table dir " + p);
- if (!moveToTrash(p))
- fs.delete(p);
- }
- }
- }
-
- @Override
- public void incrementCandidatesStat(long i) {
- status.current.candidates += i;
- }
-
- @Override
- public void incrementInUseStat(long i) {
- status.current.inUse += i;
- }
-
- }
-
- private void run() {
- long tStart, tStop;
-
- // Sleep for an initial period, giving the master time to start up and
- // old data files to be unused
-
- try {
- getZooLock(startStatsService());
- } catch (Exception ex) {
- log.error(ex, ex);
- System.exit(1);
- }
-
- try {
- log.debug("Sleeping for " + gcStartDelay + " milliseconds before beginning garbage collection cycles");
- Thread.sleep(gcStartDelay);
- } catch (InterruptedException e) {
- log.warn(e, e);
- return;
- }
-
- Sampler sampler = new CountSampler(100);
-
- while (true) {
- if (sampler.next())
- Trace.on("gc");
-
- Span gcSpan = Trace.start("loop");
- tStart = System.currentTimeMillis();
- try {
- System.gc(); // make room
-
- status.current.started = System.currentTimeMillis();
-
- new GarbageCollectionAlgorithm().collect(new GCEnv(RootTable.NAME));
- new GarbageCollectionAlgorithm().collect(new GCEnv(MetadataTable.NAME));
-
- log.info("Number of data file candidates for deletion: " + status.current.candidates);
- log.info("Number of data file candidates still in use: " + status.current.inUse);
- log.info("Number of successfully deleted data files: " + status.current.deleted);
- log.info("Number of data files delete failures: " + status.current.errors);
-
- status.current.finished = System.currentTimeMillis();
- status.last = status.current;
- status.current = new GcCycleStats();
-
- } catch (Exception e) {
- log.error(e, e);
- }
-
- tStop = System.currentTimeMillis();
- log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) / 1000.0)));
-
- // Clean up any unused write-ahead logs
- Span waLogs = Trace.start("walogs");
- try {
- GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, useTrash);
- log.info("Beginning garbage collection of write-ahead logs");
- walogCollector.collect(status);
- } catch (Exception e) {
- log.error(e, e);
- } finally {
- waLogs.stop();
- }
- gcSpan.stop();
-
- // we just made a lot of changes to the !METADATA table: flush them out
- try {
- Connector connector = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
- connector.tableOperations().compact(MetadataTable.NAME, null, null, true, true);
- connector.tableOperations().compact(RootTable.NAME, null, null, true, true);
- } catch (Exception e) {
- log.warn(e, e);
- }
-
- Trace.offNoFlush();
- try {
- long gcDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
- log.debug("Sleeping for " + gcDelay + " milliseconds");
- Thread.sleep(gcDelay);
- } catch (InterruptedException e) {
- log.warn(e, e);
- return;
- }
- }
- }
-
- private boolean moveToTrash(Path path) throws IOException {
- if (!useTrash)
- return false;
- try {
- return fs.moveToTrash(path);
- } catch (FileNotFoundException ex) {
- return false;
- }
- }
-
- private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException {
- String path = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZGC_LOCK;
-
- LockWatcher lockWatcher = new LockWatcher() {
- @Override
- public void lostLock(LockLossReason reason) {
- Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
- }
-
- @Override
- public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(-1, new Runnable() {
-
- @Override
- public void run() {
- log.fatal("No longer able to monitor lock node ", e);
- }
- });
-
- }
- };
-
- while (true) {
- lock = new ZooLock(path);
- if (lock.tryLock(lockWatcher, new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) {
- break;
- }
- UtilWaitThread.sleep(1000);
- }
- }
-
- private HostAndPort startStatsService() throws UnknownHostException {
- Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(this));
- int port = instance.getConfiguration().getPort(Property.GC_PORT);
- long maxMessageSize = instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
- HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
- try {
- port = TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize).address.getPort();
- } catch (Exception ex) {
- log.fatal(ex, ex);
- throw new RuntimeException(ex);
- }
- return result;
- }
-
-
- static public boolean almostOutOfMemory() {
- Runtime runtime = Runtime.getRuntime();
- return runtime.totalMemory() - runtime.freeMemory() > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory();
- }
-
-
- final static String METADATA_TABLE_DIR = "/" + MetadataTable.ID;
-
- private static void putMarkerDeleteMutation(final String delete, final BatchWriter writer)
- throws MutationsRejectedException {
- Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + delete);
- m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
- writer.addMutation(m);
- }
-
-
- private boolean isDir(String delete) {
- int slashCount = 0;
- for (int i = 0; i < delete.length(); i++)
- if (delete.charAt(i) == '/')
- slashCount++;
- return slashCount == 1;
- }
-
- @Override
- public GCStatus getStatus(TInfo info, TCredentials credentials) {
- return status;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java b/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
deleted file mode 100644
index d8bcebe..0000000
--- a/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.iterators;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.Filter;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
-import org.apache.log4j.Logger;
-
-/**
- * A special iterator for the metadata table that removes inactive bulk load flags
- *
- */
-public class MetadataBulkLoadFilter extends Filter {
- private static Logger log = Logger.getLogger(MetadataBulkLoadFilter.class);
-
- enum Status {
- ACTIVE, INACTIVE
- }
-
- Map<Long,Status> bulkTxStatusCache;
- Arbitrator arbitrator;
-
- @Override
- public boolean accept(Key k, Value v) {
- if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) {
- long txid = Long.valueOf(v.toString());
-
- Status status = bulkTxStatusCache.get(txid);
- if (status == null) {
- try {
- if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
- status = Status.INACTIVE;
- } else {
- status = Status.ACTIVE;
- }
- } catch (Exception e) {
- status = Status.ACTIVE;
- log.error(e, e);
- }
-
- bulkTxStatusCache.put(txid, status);
- }
-
- return status == Status.ACTIVE;
- }
-
- return true;
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
-
- if (env.getIteratorScope() == IteratorScope.scan) {
- throw new IOException("This iterator not intended for use at scan time");
- }
-
- bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
- arbitrator = getArbitrator();
- }
-
- protected Arbitrator getArbitrator() {
- return new ZooArbitrator();
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java b/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java
deleted file mode 100644
index 8cbe033..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-public enum LogEvents {
- // DO NOT CHANGE ORDER OF ENUMS, ORDER IS USED IN SERIALIZATION
- OPEN,
- DEFINE_TABLET,
- MUTATION,
- MANY_MUTATIONS,
- COMPACTION_START,
- COMPACTION_FINISH;
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java b/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java
deleted file mode 100644
index c69d409..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
-import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
-import static org.apache.accumulo.server.logger.LogEvents.MUTATION;
-import static org.apache.accumulo.server.logger.LogEvents.OPEN;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.hadoop.io.WritableComparable;
-
-public class LogFileKey implements WritableComparable<LogFileKey> {
-
- public LogEvents event;
- public String filename = null;
- public KeyExtent tablet = null;
- public long seq = -1;
- public int tid = -1;
- public static final int VERSION = 2;
- public String tserverSession;
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int value = in.readByte();
- event = LogEvents.values()[value];
- switch (event) {
- case OPEN:
- tid = in.readInt();
- tserverSession = in.readUTF();
- if (tid != VERSION) {
- throw new RuntimeException(String.format("Bad version number for log file: expected %d, but saw %d", VERSION, tid));
- }
- break;
- case COMPACTION_FINISH:
- seq = in.readLong();
- tid = in.readInt();
- break;
- case COMPACTION_START:
- seq = in.readLong();
- tid = in.readInt();
- filename = in.readUTF();
- break;
- case DEFINE_TABLET:
- seq = in.readLong();
- tid = in.readInt();
- tablet = new KeyExtent();
- tablet.readFields(in);
- break;
- case MANY_MUTATIONS:
- seq = in.readLong();
- tid = in.readInt();
- break;
- case MUTATION:
- seq = in.readLong();
- tid = in.readInt();
- break;
- default:
- throw new RuntimeException("Unknown log event type: " + event);
- }
-
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(event.ordinal());
- switch (event) {
- case OPEN:
- seq = -1;
- tid = -1;
- out.writeInt(VERSION);
- out.writeUTF(tserverSession);
- // out.writeUTF(Accumulo.getInstanceID());
- break;
- case COMPACTION_FINISH:
- out.writeLong(seq);
- out.writeInt(tid);
- break;
- case COMPACTION_START:
- out.writeLong(seq);
- out.writeInt(tid);
- out.writeUTF(filename);
- break;
- case DEFINE_TABLET:
- out.writeLong(seq);
- out.writeInt(tid);
- tablet.write(out);
- break;
- case MANY_MUTATIONS:
- out.writeLong(seq);
- out.writeInt(tid);
- break;
- case MUTATION:
- out.writeLong(seq);
- out.writeInt(tid);
- break;
- default:
- throw new IllegalArgumentException("Bad value for LogFileEntry type");
- }
- }
-
- static int eventType(LogEvents event) {
- // Order logs by START, TABLET_DEFINITIONS, COMPACTIONS and then MUTATIONS
- if (event == MUTATION || event == MANY_MUTATIONS) {
- return 3;
- }
- if (event == DEFINE_TABLET) {
- return 1;
- }
- if (event == OPEN) {
- return 0;
- }
- return 2;
- }
-
- private static int sign(long l) {
- if (l < 0)
- return -1;
- if (l > 0)
- return 1;
- return 0;
- }
-
- @Override
- public int compareTo(LogFileKey o) {
- if (eventType(this.event) != eventType(o.event)) {
- return eventType(this.event) - eventType(o.event);
- }
- if (this.event == OPEN)
- return 0;
- if (this.tid != o.tid) {
- return this.tid - o.tid;
- }
- return sign(this.seq - o.seq);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof LogFileKey) {
- return this.compareTo((LogFileKey) obj) == 0;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return (int) seq;
- }
-
- public static void printEntry(LogFileKey entry) {
- System.out.println(entry.toString());
- }
-
- @Override
- public String toString() {
- switch (event) {
- case OPEN:
- return String.format("OPEN %s", tserverSession);
- case COMPACTION_FINISH:
- return String.format("COMPACTION_FINISH %d %d", tid, seq);
- case COMPACTION_START:
- return String.format("COMPACTION_START %d %d %s", tid, seq, filename);
- case MUTATION:
- return String.format("MUTATION %d %d", tid, seq);
- case MANY_MUTATIONS:
- return String.format("MANY_MUTATIONS %d %d", tid, seq);
- case DEFINE_TABLET:
- return String.format("DEFINE_TABLET %d %d %s", tid, seq, tablet);
- }
- throw new RuntimeException("Unknown type of entry: " + event);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java b/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java
deleted file mode 100644
index 6019b18..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.hadoop.io.Writable;
-
-public class LogFileValue implements Writable {
-
- private static final List<Mutation> empty = Collections.emptyList();
-
- public List<Mutation> mutations = empty;
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int count = in.readInt();
- mutations = new ArrayList<Mutation>(count);
- for (int i = 0; i < count; i++) {
- ServerMutation mutation = new ServerMutation();
- mutation.readFields(in);
- mutations.add(mutation);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(mutations.size());
- for (Mutation m : mutations) {
- m.write(out);
- }
- }
-
- public static void print(LogFileValue value) {
- System.out.println(value.toString());
- }
-
- private static String displayLabels(byte[] labels) {
- String s = new String(labels);
- s = s.replace("&", " & ");
- s = s.replace("|", " | ");
- return s;
- }
-
- public static String format(LogFileValue lfv, int maxMutations) {
- if (lfv.mutations.size() == 0)
- return "";
- StringBuilder builder = new StringBuilder();
- builder.append(lfv.mutations.size() + " mutations:\n");
- int i = 0;
- for (Mutation m : lfv.mutations) {
- if (i++ >= maxMutations) {
- builder.append("...");
- break;
- }
- builder.append(" " + new String(m.getRow()) + "\n");
- for (ColumnUpdate update : m.getUpdates()) {
- String value = new String(update.getValue());
- builder.append(" " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " "
- + (update.hasTimestamp() ? "[user]:" : "[system]:") + update.getTimestamp() + " [" + displayLabels(update.getColumnVisibility()) + "] "
- + (update.isDeleted() ? "<deleted>" : value) + "\n");
- }
- }
- return builder.toString();
- }
-
- @Override
- public String toString() {
- return format(this, 5);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
deleted file mode 100644
index 9368819..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.cli.Help;
-import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.tabletserver.log.DfsLogger;
-import org.apache.accumulo.server.tabletserver.log.DfsLogger.DFSLoggerInputStreams;
-import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-
-public class LogReader {
-
- static class Opts extends Help {
- @Parameter(names = "-r", description = "print only mutations associated with the given row")
- String row;
- @Parameter(names = "-m", description = "limit the number of mutations printed per row")
- int maxMutations = 5;
- @Parameter(names = "-t", description = "print only mutations that fall within the given key extent")
- String extent;
- @Parameter(names = "-p", description = "search for a row that matches the given regex")
- String regexp;
- @Parameter(description = "<logfile> { <logfile> ... }")
- List<String> files = new ArrayList<String>();
- }
-
- /**
- * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
- *
- * @param args
- * - first argument is the file to print
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- Opts opts = new Opts();
- opts.parseArgs(LogReader.class.getName(), args);
- VolumeManager fs = VolumeManagerImpl.get();
-
- Matcher rowMatcher = null;
- KeyExtent ke = null;
- Text row = null;
- if (opts.files.isEmpty()) {
- new JCommander(opts).usage();
- return;
- }
- if (opts.row != null)
- row = new Text(opts.row);
- if (opts.extent != null) {
- String sa[] = opts.extent.split(";");
- ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
- }
- if (opts.regexp != null) {
- Pattern pattern = Pattern.compile(opts.regexp);
- rowMatcher = pattern.matcher("");
- }
-
- Set<Integer> tabletIds = new HashSet<Integer>();
-
- for (String file : opts.files) {
-
- Path path = new Path(file);
- LogFileKey key = new LogFileKey();
- LogFileValue value = new LogFileValue();
-
- if (fs.isFile(path)) {
- // read log entries from a simple hdfs file
- @SuppressWarnings("deprecation")
- DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getSiteConfiguration());
- DataInputStream input = streams.getDecryptingInputStream();
-
- try {
- while (true) {
- try {
- key.readFields(input);
- value.readFields(input);
- } catch (EOFException ex) {
- break;
- }
- printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
- }
- } finally {
- input.close();
- }
- } else {
- // read the log entries sorted in a map file
- MultiReader input = new MultiReader(fs, path);
- while (input.next(key, value)) {
- printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
- }
- }
- }
- }
-
- public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) {
-
- if (ke != null) {
- if (key.event == LogEvents.DEFINE_TABLET) {
- if (key.tablet.equals(ke)) {
- tabletIds.add(key.tid);
- } else {
- return;
- }
- } else if (!tabletIds.contains(key.tid)) {
- return;
- }
- }
-
- if (row != null || rowMatcher != null) {
- if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) {
- boolean found = false;
- for (Mutation m : value.mutations) {
- if (row != null && new Text(m.getRow()).equals(row)) {
- found = true;
- break;
- }
-
- if (rowMatcher != null) {
- rowMatcher.reset(new String(m.getRow()));
- if (rowMatcher.matches()) {
- found = true;
- break;
- }
- }
- }
-
- if (!found)
- return;
- } else {
- return;
- }
-
- }
-
- System.out.println(key);
- System.out.println(LogFileValue.format(value, maxMutations));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java b/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java
deleted file mode 100644
index 7449bc3..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master;
-
-import org.apache.log4j.Logger;
-
-public class EventCoordinator {
-
- private static final Logger log = Logger.getLogger(EventCoordinator.class);
- long eventCounter = 0;
-
- synchronized long waitForEvents(long millis, long lastEvent) {
- // Did something happen since the last time we waited?
- if (lastEvent == eventCounter) {
- // no
- if (millis <= 0)
- return eventCounter;
- try {
- wait(millis);
- } catch (InterruptedException e) {
- log.debug("ignoring InterruptedException", e);
- }
- }
- return eventCounter;
- }
-
- synchronized public void event(String msg, Object... args) {
- log.info(String.format(msg, args));
- eventCounter++;
- notifyAll();
- }
-
- public Listener getListener() {
- return new Listener();
- }
-
- public class Listener {
- long lastEvent;
-
- Listener() {
- lastEvent = eventCounter;
- }
-
- public void waitForEvents(long millis) {
- lastEvent = EventCoordinator.this.waitForEvents(millis, lastEvent);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
deleted file mode 100644
index 3eef065..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master;
-
-import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NotEmptyException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.common.net.HostAndPort;
-
-public class LiveTServerSet implements Watcher {
-
- public interface Listener {
- void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added);
- }
-
- private static final Logger log = Logger.getLogger(LiveTServerSet.class);
-
- private final Listener cback;
- private final Instance instance;
- private final AccumuloConfiguration conf;
- private ZooCache zooCache;
-
- public class TServerConnection {
- private final HostAndPort address;
-
- public TServerConnection(HostAndPort addr) throws TException {
- address = addr;
- }
-
- private String lockString(ZooLock mlock) {
- return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK);
- }
-
- public void assignTablet(ZooLock lock, KeyExtent extent) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save);
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException {
-
- if (usePooledConnection == true)
- throw new UnsupportedOperationException();
-
- TTransport transport = ThriftUtil.createTransport(address, conf);
-
- try {
- TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
- return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
- } finally {
- if (transport != null)
- transport.close();
- }
- }
-
- public void halt(ZooLock lock) throws TException, ThriftSecurityException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public void fastHalt(ZooLock lock) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
- startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public void chop(ZooLock lock, KeyExtent extent) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(),
- ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength()));
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public void flushTablet(ZooLock lock, KeyExtent extent) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
- startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- public boolean isActive(long tid) throws TException {
- TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- try {
- return client.isActive(Tracer.traceInfo(), tid);
- } finally {
- ThriftUtil.returnClient(client);
- }
- }
-
- }
-
- static class TServerInfo {
- TServerConnection connection;
- TServerInstance instance;
-
- TServerInfo(TServerInstance instance, TServerConnection connection) {
- this.connection = connection;
- this.instance = instance;
- }
- };
-
- // The set of active tservers with locks, indexed by their name in zookeeper
- private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
- // as above, indexed by TServerInstance
- private Map<TServerInstance,TServerInfo> currentInstances = new HashMap<TServerInstance,TServerInfo>();
-
- // The set of entries in zookeeper without locks, and the first time each was noticed
- private Map<String,Long> locklessServers = new HashMap<String,Long>();
-
- public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
- this.cback = cback;
- this.instance = instance;
- this.conf = conf;
-
- }
-
- public synchronized ZooCache getZooCache() {
- if (zooCache == null)
- zooCache = new ZooCache(this);
- return zooCache;
- }
-
- public synchronized void startListeningForTabletServerChanges() {
- scanServers();
- SimpleTimer.getInstance().schedule(new Runnable() {
- @Override
- public void run() {
- scanServers();
- }
- }, 0, 5000);
- }
-
- public synchronized void scanServers() {
- try {
- final Set<TServerInstance> updates = new HashSet<TServerInstance>();
- final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-
- final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
-
- HashSet<String> all = new HashSet<String>(current.keySet());
- all.addAll(getZooCache().getChildren(path));
-
- locklessServers.keySet().retainAll(all);
-
- for (String zPath : all) {
- checkServer(updates, doomed, path, zPath);
- }
-
- // log.debug("Current: " + current.keySet());
- if (!doomed.isEmpty() || !updates.isEmpty())
- this.cback.update(this, doomed, updates);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- }
-
- private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
- try {
- ZooReaderWriter.getInstance().delete(serverNode, -1);
- } catch (NotEmptyException ex) {
- // race condition: tserver created the lock after our last check; we'll see it at the next check
- } catch (NoNodeException nne) {
- // someone else deleted it
- }
- }
-
- private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath)
- throws TException, InterruptedException, KeeperException {
-
- TServerInfo info = current.get(zPath);
-
- final String lockPath = path + "/" + zPath;
- Stat stat = new Stat();
- byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
-
- if (lockData == null) {
- if (info != null) {
- doomed.add(info.instance);
- current.remove(zPath);
- currentInstances.remove(info.instance);
- }
-
- Long firstSeen = locklessServers.get(zPath);
- if (firstSeen == null) {
- locklessServers.put(zPath, System.currentTimeMillis());
- } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) {
- deleteServerNode(path + "/" + zPath);
- locklessServers.remove(zPath);
- }
- } else {
- locklessServers.remove(zPath);
- ServerServices services = new ServerServices(new String(lockData));
- HostAndPort client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
- TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
-
- if (info == null) {
- updates.add(instance);
- TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
- current.put(zPath, tServerInfo);
- currentInstances.put(instance, tServerInfo);
- } else if (!info.instance.equals(instance)) {
- doomed.add(info.instance);
- updates.add(instance);
- TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
- current.put(zPath, tServerInfo);
- currentInstances.put(info.instance, tServerInfo);
- }
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
-
- // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
- // relevant nodes before code below reads from zoocache
-
- if (event.getPath() != null) {
- if (event.getPath().endsWith(Constants.ZTSERVERS)) {
- scanServers();
- } else if (event.getPath().contains(Constants.ZTSERVERS)) {
- int pos = event.getPath().lastIndexOf('/');
-
- // do only if ZTSERVER is parent
- if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
-
- String server = event.getPath().substring(pos + 1);
-
- final Set<TServerInstance> updates = new HashSet<TServerInstance>();
- final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-
- final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
-
- try {
- checkServer(updates, doomed, path, server);
- if (!doomed.isEmpty() || !updates.isEmpty())
- this.cback.update(this, doomed, updates);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- }
- }
- }
- }
-
- public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
- if (server == null)
- return null;
- TServerInfo tServerInfo = currentInstances.get(server);
- if (tServerInfo == null)
- return null;
- return tServerInfo.connection;
- }
-
- public synchronized Set<TServerInstance> getCurrentServers() {
- return new HashSet<TServerInstance>(currentInstances.keySet());
- }
-
- public synchronized int size() {
- return current.size();
- }
-
- public synchronized TServerInstance find(String tabletServer) {
- HostAndPort addr = AddressUtil.parseAddress(tabletServer);
- for (Entry<String,TServerInfo> entry : current.entrySet()) {
- if (entry.getValue().instance.getLocation().equals(addr))
- return entry.getValue().instance;
- }
- return null;
- }
-
- public synchronized void remove(TServerInstance server) {
- String zPath = null;
- for (Entry<String,TServerInfo> entry : current.entrySet()) {
- if (entry.getValue().instance.equals(server)) {
- zPath = entry.getKey();
- break;
- }
- }
- if (zPath == null)
- return;
- current.remove(zPath);
- currentInstances.remove(server);
-
- log.info("Removing zookeeper lock for " + server);
- String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
- try {
- ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
- } catch (Exception e) {
- String msg = "error removing tablet server lock";
- log.fatal(msg, e);
- Halt.halt(msg, -1);
- }
- getZooCache().clear(fullpath);
- }
-}