You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:55:55 UTC
[16/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java b/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
deleted file mode 100644
index 9492bd7..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
+++ /dev/null
@@ -1,659 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master;
-
-import static java.lang.Math.min;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.Master.TabletGoalState;
-import org.apache.accumulo.server.master.state.Assignment;
-import org.apache.accumulo.server.master.state.DistributedStoreException;
-import org.apache.accumulo.server.master.state.MergeInfo;
-import org.apache.accumulo.server.master.state.MergeState;
-import org.apache.accumulo.server.master.state.MergeStats;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TableCounts;
-import org.apache.accumulo.server.master.state.TableStats;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.accumulo.server.master.state.TabletState;
-import org.apache.accumulo.server.master.state.TabletStateStore;
-import org.apache.accumulo.server.master.state.tables.TableManager;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.hadoop.io.Text;
-import org.apache.thrift.TException;
-
-class TabletGroupWatcher extends Daemon {
-
- private final Master master;
- final TabletStateStore store;
- final TabletGroupWatcher dependentWatcher;
-
- final TableStats stats = new TableStats();
-
- TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
- this.master = master;
- this.store = store;
- this.dependentWatcher = dependentWatcher;
- }
-
- Map<Text,TableCounts> getStats() {
- return stats.getLast();
- }
-
- TableCounts getStats(Text tableId) {
- return stats.getLast(tableId);
- }
-
- @Override
- public void run() {
-
- Thread.currentThread().setName("Watching " + store.name());
- int[] oldCounts = new int[TabletState.values().length];
- EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
-
- while (this.master.stillMaster()) {
- // slow things down a little, otherwise we spam the logs when there are many wake-up events
- UtilWaitThread.sleep(100);
-
- int totalUnloaded = 0;
- int unloaded = 0;
- try {
- Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
-
- // Get the current status for the current list of tservers
- SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
- for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
- currentTServers.put(entry, this.master.tserverStatus.get(entry));
- }
-
- if (currentTServers.size() == 0) {
- eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
- continue;
- }
-
- // Don't move tablets to servers that are shutting down
- SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
- destinations.keySet().removeAll(this.master.serversToShutdown);
-
- List<Assignment> assignments = new ArrayList<Assignment>();
- List<Assignment> assigned = new ArrayList<Assignment>();
- List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
- Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
-
- int[] counts = new int[TabletState.values().length];
- stats.begin();
- // Walk through the tablets in our store, and work tablets
- // towards their goal
- for (TabletLocationState tls : store) {
- if (tls == null) {
- continue;
- }
- // ignore entries for tables that do not exist in zookeeper
- if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
- continue;
-
- // Don't overwhelm the tablet servers with work
- if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
- assignments.clear();
- assigned.clear();
- assignedToDeadServers.clear();
- unassigned.clear();
- unloaded = 0;
- eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
- }
- Text tableId = tls.extent.getTableId();
- MergeStats mergeStats = mergeStatsCache.get(tableId);
- if (mergeStats == null) {
- mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
- }
- TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
- TServerInstance server = tls.getServer();
- TabletState state = tls.getState(currentTServers.keySet());
- stats.update(tableId, state);
- mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
- sendChopRequest(mergeStats.getMergeInfo(), state, tls);
- sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
-
- // Always follow through with assignments
- if (state == TabletState.ASSIGNED) {
- goal = TabletGoalState.HOSTED;
- }
-
- // if we are shutting down all the tabletservers, we have to do it in order
- if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
- if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
- if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
- goal = TabletGoalState.HOSTED;
- }
- }
- }
-
- if (goal == TabletGoalState.HOSTED) {
- if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
- if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
- continue;
- }
- switch (state) {
- case HOSTED:
- if (server.equals(this.master.migrations.get(tls.extent)))
- this.master.migrations.remove(tls.extent);
- break;
- case ASSIGNED_TO_DEAD_SERVER:
- assignedToDeadServers.add(tls);
- if (server.equals(this.master.migrations.get(tls.extent)))
- this.master.migrations.remove(tls.extent);
- // log.info("Current servers " + currentTServers.keySet());
- break;
- case UNASSIGNED:
- // maybe it's a finishing migration
- TServerInstance dest = this.master.migrations.get(tls.extent);
- if (dest != null) {
- // if destination is still good, assign it
- if (destinations.keySet().contains(dest)) {
- assignments.add(new Assignment(tls.extent, dest));
- } else {
- // get rid of this migration
- this.master.migrations.remove(tls.extent);
- unassigned.put(tls.extent, server);
- }
- } else {
- unassigned.put(tls.extent, server);
- }
- break;
- case ASSIGNED:
- // Send another reminder
- assigned.add(new Assignment(tls.extent, tls.future));
- break;
- }
- } else {
- switch (state) {
- case UNASSIGNED:
- break;
- case ASSIGNED_TO_DEAD_SERVER:
- assignedToDeadServers.add(tls);
- // log.info("Current servers " + currentTServers.keySet());
- break;
- case HOSTED:
- TServerConnection conn = this.master.tserverSet.getConnection(server);
- if (conn != null) {
- conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
- unloaded++;
- totalUnloaded++;
- } else {
- Master.log.warn("Could not connect to server " + server);
- }
- break;
- case ASSIGNED:
- break;
- }
- }
- counts[state.ordinal()]++;
- }
-
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
-
- // provide stats after flushing changes to avoid race conditions w/ delete table
- stats.end();
-
- // Report changes
- for (TabletState state : TabletState.values()) {
- int i = state.ordinal();
- if (counts[i] > 0 && counts[i] != oldCounts[i]) {
- this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
- }
- }
- Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
- oldCounts = counts;
- if (totalUnloaded > 0) {
- this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
- }
-
- updateMergeState(mergeStatsCache);
-
- Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
- eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
- } catch (Exception ex) {
- Master.log.error("Error processing table state for store " + store.name(), ex);
- UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
- }
- }
- }
-
- private int assignedOrHosted() {
- int result = 0;
- for (TableCounts counts : stats.getLast().values()) {
- result += counts.assigned() + counts.hosted();
- }
- return result;
- }
-
- private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
- // Already split?
- if (!info.getState().equals(MergeState.SPLITTING))
- return;
- // Merges don't split
- if (!info.isDelete())
- return;
- // Online and ready to split?
- if (!state.equals(TabletState.HOSTED))
- return;
- // Does this extent cover the end points of the delete?
- KeyExtent range = info.getExtent();
- if (tls.extent.overlaps(range)) {
- for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
- if (splitPoint == null)
- continue;
- if (!tls.extent.contains(splitPoint))
- continue;
- if (splitPoint.equals(tls.extent.getEndRow()))
- continue;
- if (splitPoint.equals(tls.extent.getPrevEndRow()))
- continue;
- try {
- TServerConnection conn;
- conn = this.master.tserverSet.getConnection(tls.current);
- if (conn != null) {
- Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
- conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
- } else {
- Master.log.warn("Not connected to server " + tls.current);
- }
- } catch (NotServingTabletException e) {
- Master.log.debug("Error asking tablet server to split a tablet: " + e);
- } catch (Exception e) {
- Master.log.warn("Error asking tablet server to split a tablet: " + e);
- }
- }
- }
- }
-
- private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
- // Don't bother if we're in the wrong state
- if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
- return;
- // Tablet must be online
- if (!state.equals(TabletState.HOSTED))
- return;
- // Tablet isn't already chopped
- if (tls.chopped)
- return;
- // Tablet ranges intersect
- if (info.needsToBeChopped(tls.extent)) {
- TServerConnection conn;
- try {
- conn = this.master.tserverSet.getConnection(tls.current);
- if (conn != null) {
- Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
- conn.chop(this.master.masterLock, tls.extent);
- } else {
- Master.log.warn("Could not connect to server " + tls.current);
- }
- } catch (TException e) {
- Master.log.warn("Communications error asking tablet server to chop a tablet");
- }
- }
- }
-
- private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
- for (MergeStats stats : mergeStatsCache.values()) {
- try {
- MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
- // when next state is MERGING, its important to persist this before
- // starting the merge... the verification check that is done before
- // moving into the merging state could fail if merge starts but does
- // not finish
- if (update == MergeState.COMPLETE)
- update = MergeState.NONE;
- if (update != stats.getMergeInfo().getState()) {
- this.master.setMergeState(stats.getMergeInfo(), update);
- }
-
- if (update == MergeState.MERGING) {
- try {
- if (stats.getMergeInfo().isDelete()) {
- deleteTablets(stats.getMergeInfo());
- } else {
- mergeMetadataRecords(stats.getMergeInfo());
- }
- this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
- } catch (Exception ex) {
- Master.log.error("Unable merge metadata table records", ex);
- }
- }
- } catch (Exception ex) {
- Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
- }
- }
- }
-
- private void deleteTablets(MergeInfo info) throws AccumuloException {
- KeyExtent extent = info.getExtent();
- String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME;
- Master.log.debug("Deleting tablets for " + extent);
- char timeType = '\0';
- KeyExtent followingTablet = null;
- if (extent.getEndRow() != null) {
- Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW);
- followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow()));
- Master.log.debug("Found following tablet " + followingTablet);
- }
- try {
- Connector conn = this.master.getConnector();
- Text start = extent.getPrevEndRow();
- if (start == null) {
- start = new Text();
- }
- Master.log.debug("Making file deletion entries for " + extent);
- Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(),
- extent.getEndRow()), true);
- Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
- scanner.setRange(deleteRange);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
- Set<FileRef> datafiles = new TreeSet<FileRef>();
- for (Entry<Key,Value> entry : scanner) {
- Key key = entry.getKey();
- if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
- datafiles.add(new FileRef(this.master.fs, key));
- if (datafiles.size() > 1000) {
- MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
- datafiles.clear();
- }
- } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
- timeType = entry.getValue().toString().charAt(0);
- } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
- throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
- } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
- datafiles.add(new FileRef(entry.getValue().toString(), this.master.fs.getFullPath(FileType.TABLE, entry.getValue().toString())));
- if (datafiles.size() > 1000) {
- MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
- datafiles.clear();
- }
- }
- }
- MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
- BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
- try {
- deleteTablets(info, deleteRange, bw, conn);
- } finally {
- bw.close();
- }
-
- if (followingTablet != null) {
- Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow());
- bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
- try {
- Mutation m = new Mutation(followingTablet.getMetadataEntry());
- TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow()));
- ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
- bw.addMutation(m);
- bw.flush();
- } finally {
- bw.close();
- }
- } else {
- // Recreate the default tablet to hold the end of the table
- Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow());
- String tdir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION;
- MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir,
- SystemCredentials.get(), timeType, this.master.masterLock);
- }
- } catch (Exception ex) {
- throw new AccumuloException(ex);
- }
- }
-
- private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
- KeyExtent range = info.getExtent();
- Master.log.debug("Merging metadata for " + range);
- KeyExtent stop = getHighTablet(range);
- Master.log.debug("Highest tablet is " + stop);
- Value firstPrevRowValue = null;
- Text stopRow = stop.getMetadataEntry();
- Text start = range.getPrevEndRow();
- if (start == null) {
- start = new Text();
- }
- Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
- String targetSystemTable = MetadataTable.NAME;
- if (range.isMeta()) {
- targetSystemTable = RootTable.NAME;
- }
-
- BatchWriter bw = null;
- try {
- long fileCount = 0;
- Connector conn = this.master.getConnector();
- // Make file entries in highest tablet
- bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
- Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
- scanner.setRange(scanRange);
- TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- Mutation m = new Mutation(stopRow);
- String maxLogicalTime = null;
- for (Entry<Key,Value> entry : scanner) {
- Key key = entry.getKey();
- Value value = entry.getValue();
- if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
- fileCount++;
- } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
- Master.log.debug("prevRow entry for lowest tablet is " + value);
- firstPrevRowValue = new Value(value);
- } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
- maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
- } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
- bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
- }
- }
-
- // read the logical time from the last tablet in the merge range, it is not included in
- // the loop above
- scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
- scanner.setRange(new Range(stopRow));
- TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
- for (Entry<Key,Value> entry : scanner) {
- if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
- maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
- }
- }
-
- if (maxLogicalTime != null)
- TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
-
- if (!m.getUpdates().isEmpty()) {
- bw.addMutation(m);
- }
-
- bw.flush();
-
- Master.log.debug("Moved " + fileCount + " files to " + stop);
-
- if (firstPrevRowValue == null) {
- Master.log.debug("tablet already merged");
- return;
- }
-
- stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
- Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
- Master.log.debug("Setting the prevRow for last tablet: " + stop);
- bw.addMutation(updatePrevRow);
- bw.flush();
-
- deleteTablets(info, scanRange, bw, conn);
-
- // Clean-up the last chopped marker
- m = new Mutation(stopRow);
- ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
- bw.addMutation(m);
- bw.flush();
-
- } catch (Exception ex) {
- throw new AccumuloException(ex);
- } finally {
- if (bw != null)
- try {
- bw.close();
- } catch (Exception ex) {
- throw new AccumuloException(ex);
- }
- }
- }
-
- private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
- Scanner scanner;
- Mutation m;
- // Delete everything in the other tablets
- // group all deletes into tablet into one mutation, this makes tablets
- // either disappear entirely or not all.. this is important for the case
- // where the process terminates in the loop below...
- scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
- Master.log.debug("Deleting range " + scanRange);
- scanner.setRange(scanRange);
- RowIterator rowIter = new RowIterator(scanner);
- while (rowIter.hasNext()) {
- Iterator<Entry<Key,Value>> row = rowIter.next();
- m = null;
- while (row.hasNext()) {
- Entry<Key,Value> entry = row.next();
- Key key = entry.getKey();
-
- if (m == null)
- m = new Mutation(key.getRow());
-
- m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
- Master.log.debug("deleting entry " + key);
- }
- bw.addMutation(m);
- }
-
- bw.flush();
- }
-
- private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
- try {
- Connector conn = this.master.getConnector();
- Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
- TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
- scanner.setRange(new Range(start.getMetadataEntry(), null));
- Iterator<Entry<Key,Value>> iterator = scanner.iterator();
- if (!iterator.hasNext()) {
- throw new AccumuloException("No last tablet for a merge " + range);
- }
- Entry<Key,Value> entry = iterator.next();
- KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
- if (highTablet.getTableId() != range.getTableId()) {
- throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
- }
- return highTablet;
- } catch (Exception ex) {
- throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
- }
- }
-
- private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
- List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
- if (!assignedToDeadServers.isEmpty()) {
- int maxServersToShow = min(assignedToDeadServers.size(), 100);
- Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
- store.unassign(assignedToDeadServers);
- this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
- }
-
- if (!currentTServers.isEmpty()) {
- Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
- this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
- for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
- if (unassigned.containsKey(assignment.getKey())) {
- if (assignment.getValue() != null) {
- Master.log.debug(store.name() + " assigning tablet " + assignment);
- assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
- }
- } else {
- Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
- }
- }
- if (!unassigned.isEmpty() && assignedOut.isEmpty())
- Master.log.warn("Load balancer failed to assign any tablets");
- }
-
- if (assignments.size() > 0) {
- Master.log.info(String.format("Assigning %d tablets", assignments.size()));
- store.setFutureLocations(assignments);
- }
- assignments.addAll(assigned);
- for (Assignment a : assignments) {
- TServerConnection conn = this.master.tserverSet.getConnection(a.server);
- if (conn != null) {
- conn.assignTablet(this.master.masterLock, a.tablet);
- } else {
- Master.log.warn("Could not connect to server " + a.server);
- }
- master.assignedTablet(a.tablet);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
deleted file mode 100644
index ec3371c..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.thrift.TException;
-
-/**
- * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them from resting in a single location for very long. This is not
- * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
- */
-public class ChaoticLoadBalancer extends TabletBalancer {
- Random r = new Random();
-
- @Override
- public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
- Map<KeyExtent,TServerInstance> assignments) {
- long total = assignments.size() + unassigned.size();
- long avg = (long) Math.ceil(((double) total) / current.size());
- Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
- List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
- for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
- long numTablets = 0;
- for (TableInfo ti : e.getValue().getTableMap().values()) {
- numTablets += ti.tablets;
- }
- if (numTablets < avg) {
- tServerArray.add(e.getKey());
- toAssign.put(e.getKey(), avg - numTablets);
- }
- }
-
- for (KeyExtent ke : unassigned.keySet()) {
- int index = r.nextInt(tServerArray.size());
- TServerInstance dest = tServerArray.get(index);
- assignments.put(ke, dest);
- long remaining = toAssign.get(dest).longValue() - 1;
- if (remaining == 0) {
- tServerArray.remove(index);
- toAssign.remove(dest);
- } else {
- toAssign.put(dest, remaining);
- }
- }
- }
-
- /**
- * Will balance randomly, maintaining distribution
- */
- @Override
- public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
- Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
- List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
-
- if (!migrations.isEmpty())
- return 100;
-
- boolean moveMetadata = r.nextInt(4) == 0;
- long totalTablets = 0;
- for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
- long tabletCount = 0;
- for (TableInfo ti : e.getValue().getTableMap().values()) {
- tabletCount += ti.tablets;
- }
- numTablets.put(e.getKey(), tabletCount);
- underCapacityTServer.add(e.getKey());
- totalTablets += tabletCount;
- }
- // totalTablets is fuzzy due to asynchronicity of the stats
- // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
- long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
-
- for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
- for (String table : e.getValue().getTableMap().keySet()) {
- if (!moveMetadata && MetadataTable.NAME.equals(table))
- continue;
- try {
- for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
- KeyExtent ke = new KeyExtent(ts.extent);
- int index = r.nextInt(underCapacityTServer.size());
- TServerInstance dest = underCapacityTServer.get(index);
- if (dest.equals(e.getKey()))
- continue;
- migrationsOut.add(new TabletMigration(ke, e.getKey(), dest));
- if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
- underCapacityTServer.remove(index);
- if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey()))
- underCapacityTServer.add(e.getKey());
-
- // We can get some craziness with only 1 tserver, so lets make sure there's always an option!
- if (underCapacityTServer.isEmpty())
- underCapacityTServer.addAll(numTablets.keySet());
- }
- } catch (ThriftSecurityException e1) {
- // Shouldn't happen, but carry on if it does
- e1.printStackTrace();
- } catch (TException e1) {
- // Shouldn't happen, but carry on if it does
- e1.printStackTrace();
- }
- }
- }
-
- return 100;
- }
-
- @Override
- public void init(ServerConfiguration conf) {
- super.init(conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
deleted file mode 100644
index 9b88d74..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.log4j.Logger;
-
-public class DefaultLoadBalancer extends TabletBalancer {
-
- private static final Logger log = Logger.getLogger(DefaultLoadBalancer.class);
-
- Iterator<TServerInstance> assignments;
- // if tableToBalance is set, then only balance the given table
- String tableToBalance = null;
-
- public DefaultLoadBalancer() {
-
- }
-
- public DefaultLoadBalancer(String table) {
- tableToBalance = table;
- }
-
- List<TServerInstance> randomize(Set<TServerInstance> locations) {
- List<TServerInstance> result = new ArrayList<TServerInstance>(locations);
- Collections.shuffle(result);
- return result;
- }
-
- public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, KeyExtent extent, TServerInstance last) {
- if (locations.size() == 0)
- return null;
-
- if (last != null) {
- // Maintain locality
- TServerInstance simple = new TServerInstance(last.getLocation(), "");
- Iterator<TServerInstance> find = locations.tailMap(simple).keySet().iterator();
- if (find.hasNext()) {
- TServerInstance current = find.next();
- if (current.host().equals(last.host()))
- return current;
- }
- }
-
- // The strategy here is to walk through the locations and hand them back, one at a time
- // Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list.
- if (assignments == null || !assignments.hasNext())
- assignments = randomize(locations.keySet()).iterator();
- TServerInstance result = assignments.next();
- if (!locations.containsKey(result)) {
- assignments = null;
- return randomize(locations.keySet()).iterator().next();
- }
- return result;
- }
-
- static class ServerCounts implements Comparable<ServerCounts> {
- public final TServerInstance server;
- public final int count;
- public final TabletServerStatus status;
-
- ServerCounts(int count, TServerInstance server, TabletServerStatus status) {
- this.count = count;
- this.server = server;
- this.status = status;
- }
-
- public int compareTo(ServerCounts obj) {
- int result = count - obj.count;
- if (result == 0)
- return server.compareTo(obj.server);
- return result;
- }
- }
-
- public boolean getMigrations(Map<TServerInstance,TabletServerStatus> current, List<TabletMigration> result) {
- boolean moreBalancingNeeded = false;
- try {
- // no moves possible
- if (current.size() < 2) {
- return false;
- }
-
- // Sort by total number of online tablets, per server
- int total = 0;
- ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>();
- for (Entry<TServerInstance,TabletServerStatus> entry : current.entrySet()) {
- int serverTotal = 0;
- if (entry.getValue() != null && entry.getValue().tableMap != null) {
- for (Entry<String,TableInfo> e : entry.getValue().tableMap.entrySet()) {
- /**
- * The check below was on entry.getKey(), but that resolves to a tabletserver not a tablename. Believe it should be e.getKey() which is a tablename
- */
- if (tableToBalance == null || tableToBalance.equals(e.getKey()))
- serverTotal += e.getValue().onlineTablets;
- }
- }
- totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue()));
- total += serverTotal;
- }
-
- // order from low to high
- Collections.sort(totals);
- Collections.reverse(totals);
- int even = total / totals.size();
- int numServersOverEven = total % totals.size();
-
- // Move tablets from the servers with too many to the servers with
- // the fewest but only nominate tablets to move once. This allows us
- // to fill new servers with tablets from a mostly balanced server
- // very quickly. However, it may take several balancing passes to move
- // tablets from one hugely overloaded server to many slightly
- // under-loaded servers.
- int end = totals.size() - 1;
- int movedAlready = 0;
- for (int tooManyIndex = 0; tooManyIndex < totals.size(); tooManyIndex++) {
- ServerCounts tooMany = totals.get(tooManyIndex);
- int goal = even;
- if (tooManyIndex < numServersOverEven) {
- goal++;
- }
- int needToUnload = tooMany.count - goal;
- ServerCounts tooLittle = totals.get(end);
- int needToLoad = goal - tooLittle.count - movedAlready;
- if (needToUnload < 1 && needToLoad < 1) {
- break;
- }
- if (needToUnload >= needToLoad) {
- result.addAll(move(tooMany, tooLittle, needToLoad));
- end--;
- movedAlready = 0;
- } else {
- result.addAll(move(tooMany, tooLittle, needToUnload));
- movedAlready += needToUnload;
- }
- if (needToUnload > needToLoad)
- moreBalancingNeeded = true;
- }
-
- } finally {
- log.debug("balance ended with " + result.size() + " migrations");
- }
- return moreBalancingNeeded;
- }
-
- static class TableDiff {
- int diff;
- String table;
-
- public TableDiff(int diff, String table) {
- this.diff = diff;
- this.table = table;
- }
- };
-
- /**
- * Select a tablet based on differences between table loads; if the loads are even, use the busiest table
- */
- List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count) {
-
- List<TabletMigration> result = new ArrayList<TabletMigration>();
- if (count == 0)
- return result;
-
- Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new HashMap<String,Map<KeyExtent,TabletStats>>();
- // Copy counts so we can update them as we propose migrations
- Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
- Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
-
- for (int i = 0; i < count; i++) {
- String table;
- Integer tooLittleCount;
- if (tableToBalance == null) {
- // find a table to migrate
- // look for an uneven table count
- int biggestDifference = 0;
- String biggestDifferenceTable = null;
- for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) {
- String tableID = tableEntry.getKey();
- if (tooLittleMap.get(tableID) == null)
- tooLittleMap.put(tableID, 0);
- int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
- if (diff > biggestDifference) {
- biggestDifference = diff;
- biggestDifferenceTable = tableID;
- }
- }
- if (biggestDifference < 2) {
- table = busiest(tooMuch.status.tableMap);
- } else {
- table = biggestDifferenceTable;
- }
- } else {
- // just balance the given table
- table = tableToBalance;
- }
- Map<KeyExtent,TabletStats> onlineTabletsForTable = onlineTablets.get(table);
- try {
- if (onlineTabletsForTable == null) {
- onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>();
- for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, table))
- onlineTabletsForTable.put(new KeyExtent(stat.extent), stat);
- onlineTablets.put(table, onlineTabletsForTable);
- }
- } catch (Exception ex) {
- log.error("Unable to select a tablet to move", ex);
- return result;
- }
- KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable);
- onlineTabletsForTable.remove(extent);
- if (extent == null)
- return result;
- tooMuchMap.put(table, tooMuchMap.get(table) - 1);
- /**
- * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there is only one tabletserver that holds all of the tablets. Here
- * we check to see if in fact that is the case and if so set the value to 0.
- */
- tooLittleCount = tooLittleMap.get(table);
- if (tooLittleCount == null) {
- tooLittleCount = 0;
- }
- tooLittleMap.put(table, tooLittleCount + 1);
-
- result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server));
- }
- return result;
- }
-
- static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) {
- Map<String,Integer> result = new HashMap<String,Integer>();
- if (status != null && status.tableMap != null) {
- Map<String,TableInfo> tableMap = status.tableMap;
- for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
- result.put(entry.getKey(), entry.getValue().onlineTablets);
- }
- }
- return result;
- }
-
- static KeyExtent selectTablet(TServerInstance tserver, Map<KeyExtent,TabletStats> extents) {
- if (extents.size() == 0)
- return null;
- KeyExtent mostRecentlySplit = null;
- long splitTime = 0;
- for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
- if (entry.getValue().splitCreationTime >= splitTime) {
- splitTime = entry.getValue().splitCreationTime;
- mostRecentlySplit = entry.getKey();
- }
- return mostRecentlySplit;
- }
-
- // define what it means for a tablet to be busy
- private static String busiest(Map<String,TableInfo> tables) {
- String result = null;
- double busiest = Double.NEGATIVE_INFINITY;
- for (Entry<String,TableInfo> entry : tables.entrySet()) {
- TableInfo info = entry.getValue();
- double busy = info.ingestRate + info.queryRate;
- if (busy > busiest) {
- busiest = busy;
- result = entry.getKey();
- }
- }
- return result;
- }
-
- @Override
- public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
- Map<KeyExtent,TServerInstance> assignments) {
- for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
- assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue()));
- }
- }
-
- @Override
- public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
- // do we have any servers?
- if (current.size() > 0) {
- // Don't migrate if we have migrations in progress
- if (migrations.size() == 0) {
- if (getMigrations(current, migrationsOut))
- return 1 * 1000;
- }
- }
- return 5 * 1000;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
deleted file mode 100644
index 3e0a2bf..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.log4j.Logger;
-
-public class TableLoadBalancer extends TabletBalancer {
-
- private static final Logger log = Logger.getLogger(TableLoadBalancer.class);
-
- Map<String,TabletBalancer> perTableBalancers = new HashMap<String,TabletBalancer>();
-
- private TabletBalancer constructNewBalancerForTable(String clazzName, String table) throws Exception {
- Class<? extends TabletBalancer> clazz = AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class);
- Constructor<? extends TabletBalancer> constructor = clazz.getConstructor(String.class);
- return constructor.newInstance(table);
- }
-
- protected String getLoadBalancerClassNameForTable(String table) {
- return configuration.getTableConfiguration(table).get(Property.TABLE_LOAD_BALANCER);
- }
-
- protected TabletBalancer getBalancerForTable(String table) {
- TabletBalancer balancer = perTableBalancers.get(table);
-
- String clazzName = getLoadBalancerClassNameForTable(table);
-
- if (clazzName == null)
- clazzName = DefaultLoadBalancer.class.getName();
- if (balancer != null) {
- if (clazzName.equals(balancer.getClass().getName()) == false) {
- // the balancer class for this table does not match the class specified in the configuration
- try {
- // attempt to construct a balancer with the specified class
- TabletBalancer newBalancer = constructNewBalancerForTable(clazzName, table);
- if (newBalancer != null) {
- balancer = newBalancer;
- perTableBalancers.put(table, balancer);
- balancer.init(configuration);
- }
- } catch (Exception e) {
- log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
- }
- }
- }
- if (balancer == null) {
- try {
- balancer = constructNewBalancerForTable(clazzName, table);
- log.info("Loaded class " + clazzName + " for table " + table);
- } catch (Exception e) {
- log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
- }
-
- if (balancer == null) {
- log.info("Using balancer " + DefaultLoadBalancer.class.getName() + " for table " + table);
- balancer = new DefaultLoadBalancer(table);
- }
- perTableBalancers.put(table, balancer);
- balancer.init(configuration);
- }
- return balancer;
- }
-
- @Override
- public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
- Map<KeyExtent,TServerInstance> assignments) {
- // separate the unassigned into tables
- Map<String,Map<KeyExtent,TServerInstance>> groupedUnassigned = new HashMap<String,Map<KeyExtent,TServerInstance>>();
- for (Entry<KeyExtent,TServerInstance> e : unassigned.entrySet()) {
- Map<KeyExtent,TServerInstance> tableUnassigned = groupedUnassigned.get(e.getKey().getTableId().toString());
- if (tableUnassigned == null) {
- tableUnassigned = new HashMap<KeyExtent,TServerInstance>();
- groupedUnassigned.put(e.getKey().getTableId().toString(), tableUnassigned);
- }
- tableUnassigned.put(e.getKey(), e.getValue());
- }
- for (Entry<String,Map<KeyExtent,TServerInstance>> e : groupedUnassigned.entrySet()) {
- Map<KeyExtent,TServerInstance> newAssignments = new HashMap<KeyExtent,TServerInstance>();
- getBalancerForTable(e.getKey()).getAssignments(current, e.getValue(), newAssignments);
- assignments.putAll(newAssignments);
- }
- }
-
- private TableOperations tops = null;
-
- protected TableOperations getTableOperations() {
- if (tops == null)
- try {
- tops = configuration.getInstance().getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()).tableOperations();
- } catch (AccumuloException e) {
- log.error("Unable to access table operations from within table balancer", e);
- } catch (AccumuloSecurityException e) {
- log.error("Unable to access table operations from within table balancer", e);
- }
- return tops;
- }
-
- @Override
- public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
- long minBalanceTime = 5 * 1000;
- // Iterate over the tables and balance each of them
- TableOperations t = getTableOperations();
- if (t == null)
- return minBalanceTime;
- for (String s : t.tableIdMap().values()) {
- ArrayList<TabletMigration> newMigrations = new ArrayList<TabletMigration>();
- long tableBalanceTime = getBalancerForTable(s).balance(current, migrations, newMigrations);
- if (tableBalanceTime < minBalanceTime)
- minBalanceTime = tableBalanceTime;
- migrationsOut.addAll(newMigrations);
- }
- return minBalanceTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
deleted file mode 100644
index fd76ce2..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-
-public abstract class TabletBalancer {
-
- private static final Logger log = Logger.getLogger(TabletBalancer.class);
-
- protected ServerConfiguration configuration;
-
- /**
- * Initialize the TabletBalancer. This gives the balancer the opportunity to read the configuration.
- */
- public void init(ServerConfiguration conf) {
- configuration = conf;
- }
-
- /**
- * Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned.
- *
- * @param current
- * The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet
- * server has not yet responded to a recent request for status.
- * @param unassigned
- * A map from unassigned tablet to the last known tablet server. Read-only.
- * @param assignments
- * A map from tablet to assigned server. Write-only.
- */
- abstract public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
- Map<KeyExtent,TServerInstance> assignments);
-
- /**
- * Ask the balancer if any migrations are necessary.
- *
- * @param current
- * The current table-summary state of all the online tablet servers. Read-only.
- * @param migrations
- * the current set of migrations. Read-only.
- * @param migrationsOut
- * new migrations to perform; should not contain tablets in the current set of migrations. Write-only.
- * @return the time, in milliseconds, to wait before re-balancing.
- *
- * This method will not be called when there are unassigned tablets.
- */
- public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
-
- /**
- * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
- * to move.
- *
- * @param tserver
- * The tablet server to ask.
- * @param tableId
- * The table id
- * @return a list of tablet statistics
- * @throws ThriftSecurityException
- * tablet server disapproves of your internal System password.
- * @throws TException
- * any other problem
- */
- public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
- log.debug("Scanning tablet server " + tserver + " for table " + tableId);
- Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), tserver.getLocation(), configuration.getConfiguration());
- try {
- List<TabletStats> onlineTabletsForTable = client.getTabletStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(configuration.getInstance()),
- tableId);
- return onlineTabletsForTable;
- } catch (TTransportException e) {
- log.error("Unable to connect to " + tserver + ": " + e);
- } finally {
- ThriftUtil.returnClient(client);
- }
- return null;
- }
-
- /**
- * Utility to ensure that the migrations from balance() are consistent:
- * <ul>
- * <li>Tablet objects are not null
- * <li>Source and destination tablet servers are not null and current
- * </ul>
- *
- * @param current
- * @param migrations
- * @return A list of TabletMigration object that passed sanity checks.
- */
- public static List<TabletMigration> checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration> migrations) {
- List<TabletMigration> result = new ArrayList<TabletMigration>(migrations.size());
- for (TabletMigration m : migrations) {
- if (m.tablet == null) {
- log.warn("Balancer gave back a null tablet " + m);
- continue;
- }
- if (m.newServer == null) {
- log.warn("Balancer did not set the destination " + m);
- continue;
- }
- if (m.oldServer == null) {
- log.warn("Balancer did not set the source " + m);
- continue;
- }
- if (!current.contains(m.oldServer)) {
- log.warn("Balancer wants to move a tablet from a server that is not current: " + m);
- continue;
- }
- if (!current.contains(m.newServer)) {
- log.warn("Balancer wants to move a tablet to a server that is not current: " + m);
- continue;
- }
- result.add(m);
- }
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
deleted file mode 100644
index 3eda844..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.log4j.Logger;
-
-public class HadoopLogCloser implements LogCloser {
-
- private static Logger log = Logger.getLogger(HadoopLogCloser.class);
-
- @Override
- public long close(Master master, VolumeManager fs, Path source) throws IOException {
- FileSystem ns = fs.getFileSystemByPath(source);
- if (ns instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) ns;
- try {
- if (!dfs.recoverLease(source)) {
- log.info("Waiting for file to be closed " + source.toString());
- return master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_LEASE_RECOVERY_WAITING_PERIOD);
- }
- log.info("Recovered lease on " + source.toString());
- } catch (FileNotFoundException ex) {
- throw ex;
- } catch (Exception ex) {
- log.warn("Error recovering lease on " + source.toString(), ex);
- ns.append(source).close();
- log.info("Recovered lease on " + source.toString() + " using append");
- }
- } else if (ns instanceof LocalFileSystem) {
- // ignore
- } else {
- throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
- }
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
deleted file mode 100644
index 42497ff..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.IOException;
-
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.Path;
-
-public interface LogCloser {
- public long close(Master master, VolumeManager fs, Path path) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
deleted file mode 100644
index a5bb0c7..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.IOException;
-
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.log4j.Logger;
-
-public class MapRLogCloser implements LogCloser {
-
- private static Logger log = Logger.getLogger(MapRLogCloser.class);
-
- @Override
- public long close(Master m, VolumeManager fs, Path path) throws IOException {
- log.info("Recovering file " + path.toString() + " by changing permission to readonly");
- FileSystem ns = fs.getFileSystemByPath(path);
- FsPermission roPerm = new FsPermission((short) 0444);
- try {
- ns.setPermission(path, roPerm);
- return 0;
- } catch (IOException ex) {
- log.error("error recovering lease ", ex);
- // lets do this again
- return 1000;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
deleted file mode 100644
index 5ce7a66..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-public class RecoveryManager {
-
- private static Logger log = Logger.getLogger(RecoveryManager.class);
-
- private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
- private Set<String> closeTasksQueued = new HashSet<String>();
- private Set<String> sortsQueued = new HashSet<String>();
- private ScheduledExecutorService executor;
- private Master master;
- private ZooCache zooCache;
-
- public RecoveryManager(Master master) {
- this.master = master;
- executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
- zooCache = new ZooCache();
- try {
- List<String> workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued();
- sortsQueued.addAll(workIDs);
- } catch (Exception e) {
- log.warn(e, e);
- }
- }
-
- private class LogSortTask implements Runnable {
- private String source;
- private String destination;
- private String sortId;
- private LogCloser closer;
-
- public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
- this.closer = closer;
- this.source = source;
- this.destination = destination;
- this.sortId = sortId;
- }
-
- @Override
- public void run() {
- boolean rescheduled = false;
- try {
-
- long time = closer.close(master, master.getFileSystem(), new Path(source));
-
- if (time > 0) {
- executor.schedule(this, time, TimeUnit.MILLISECONDS);
- rescheduled = true;
- } else {
- initiateSort(sortId, source, destination);
- }
- } catch (FileNotFoundException e) {
- log.debug("Unable to initate log sort for " + source + ": " + e);
- } catch (Exception e) {
- log.warn("Failed to initiate log sort " + source, e);
- } finally {
- if (!rescheduled) {
- synchronized (RecoveryManager.this) {
- closeTasksQueued.remove(sortId);
- }
- }
- }
- }
-
- }
-
- private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
- String work = source + "|" + destination;
- new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
-
- synchronized (this) {
- sortsQueued.add(sortId);
- }
-
- final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
- log.info("Created zookeeper entry " + path + " with data " + work);
- }
-
- public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
- boolean recoveryNeeded = false;
- ;
- for (Collection<String> logs : walogs) {
- for (String walog : logs) {
- String hostFilename[] = walog.split("/", 2);
- String host = hostFilename[0];
- String filename = hostFilename[1];
- String parts[] = filename.split("/");
- String sortId = parts[parts.length - 1];
- String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
- filename = master.getFileSystem().getFullPath(FileType.WAL, walog).toString();
- log.debug("Recovering " + filename + " to " + dest);
-
- boolean sortQueued;
- synchronized (this) {
- sortQueued = sortsQueued.contains(sortId);
- }
-
- if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
- synchronized (this) {
- sortsQueued.remove(sortId);
- }
- }
-
- if (master.getFileSystem().exists(new Path(dest, "finished"))) {
- synchronized (this) {
- closeTasksQueued.remove(sortId);
- recoveryDelay.remove(sortId);
- sortsQueued.remove(sortId);
- }
- continue;
- }
-
- recoveryNeeded = true;
- synchronized (this) {
- if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
- AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
- LogCloser closer = Property.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
- Long delay = recoveryDelay.get(sortId);
- if (delay == null) {
- delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
- } else {
- delay = Math.min(2 * delay, 1000 * 60 * 5l);
- }
-
- log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
-
- executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
- closeTasksQueued.add(sortId);
- recoveryDelay.put(sortId, delay);
- }
- }
- }
- }
- return recoveryNeeded;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java b/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
deleted file mode 100644
index 40b7a93..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import org.apache.accumulo.core.data.KeyExtent;
-
-public class Assignment {
- public KeyExtent tablet;
- public TServerInstance server;
-
- public Assignment(KeyExtent tablet, TServerInstance server) {
- this.tablet = tablet;
- this.server = server;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
deleted file mode 100644
index f4d98bf..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.util.Collection;
-import java.util.Set;
-
-public interface CurrentState {
-
- Set<String> onlineTables();
-
- Set<TServerInstance> onlineTabletServers();
-
- Collection<MergeInfo> merges();
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java b/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
deleted file mode 100644
index b2ea7d6..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.accumulo.core.master.thrift.DeadServer;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-public class DeadServerList {
- private static final Logger log = Logger.getLogger(DeadServerList.class);
- private final String path;
-
- public DeadServerList(String path) {
- this.path = path;
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- try {
- zoo.mkdirs(path);
- } catch (Exception ex) {
- log.error("Unable to make parent directories of " + path, ex);
- }
- }
-
- public List<DeadServer> getList() {
- List<DeadServer> result = new ArrayList<DeadServer>();
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- try {
- List<String> children = zoo.getChildren(path);
- if (children != null) {
- for (String child : children) {
- Stat stat = new Stat();
- byte[] data = zoo.getData(path + "/" + child, stat);
- DeadServer server = new DeadServer(child, stat.getMtime(), new String(data));
- result.add(server);
- }
- }
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- return result;
- }
-
- public void delete(String server) {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- try {
- zoo.recursiveDelete(path + "/" + server, NodeMissingPolicy.SKIP);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- }
-
- public void post(String server, String cause) {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- try {
- zoo.putPersistentData(path + "/" + server, cause.getBytes(), NodeExistsPolicy.SKIP);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java b/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
deleted file mode 100644
index ad658df..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.util.List;
-
-/*
- * An abstract version of ZooKeeper that we can write tests against.
- */
-public interface DistributedStore {
-
- public List<String> getChildren(String path) throws DistributedStoreException;
-
- public byte[] get(String path) throws DistributedStoreException;
-
- public void put(String path, byte[] bs) throws DistributedStoreException;
-
- public void remove(String path) throws DistributedStoreException;
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java b/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
deleted file mode 100644
index 3d3a725..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-public class DistributedStoreException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- public DistributedStoreException(String why) {
- super(why);
- }
-
- public DistributedStoreException(Exception cause) {
- super(cause);
- }
-
- public DistributedStoreException(String why, Exception cause) {
- super(why, cause);
- }
-}