You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/06/24 21:29:41 UTC
svn commit: r1496171 [2/3] - in /accumulo/trunk:
core/src/main/java/org/apache/accumulo/core/client/admin/
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accumulo/core/client/mock/
core/src/main/java/org/apache/a...
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java?rev=1496171&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java Mon Jun 24 19:29:39 2013
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master;
+
+import static java.lang.Math.min;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.RootTable;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.Master.TabletGoalState;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MergeStats;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TableCounts;
+import org.apache.accumulo.server.master.state.TableStats;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.tables.TableManager;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.tabletserver.TabletTime;
+import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+
+class TabletGroupWatcher extends Daemon {
+
+ private final Master master;
+ final TabletStateStore store;
+ final TabletGroupWatcher dependentWatcher;
+
+ final TableStats stats = new TableStats();
+
+ TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
+ this.master = master;
+ this.store = store;
+ this.dependentWatcher = dependentWatcher;
+ }
+
+ Map<Text,TableCounts> getStats() {
+ return stats.getLast();
+ }
+
+ TableCounts getStats(Text tableId) {
+ return stats.getLast(tableId);
+ }
+
+ @Override
+ public void run() {
+
+ Thread.currentThread().setName("Watching " + store.name());
+ int[] oldCounts = new int[TabletState.values().length];
+ EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
+
+ while (this.master.stillMaster()) {
+ int totalUnloaded = 0;
+ int unloaded = 0;
+ try {
+ Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
+
+ // Get the current status for the current list of tservers
+ SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
+ for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
+ currentTServers.put(entry, this.master.tserverStatus.get(entry));
+ }
+
+ if (currentTServers.size() == 0) {
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ continue;
+ }
+
+ // Don't move tablets to servers that are shutting down
+ SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
+ destinations.keySet().removeAll(this.master.serversToShutdown);
+
+ List<Assignment> assignments = new ArrayList<Assignment>();
+ List<Assignment> assigned = new ArrayList<Assignment>();
+ List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
+ Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+
+ int[] counts = new int[TabletState.values().length];
+ stats.begin();
+ // Walk through the tablets in our store, and work tablets
+ // towards their goal
+ for (TabletLocationState tls : store) {
+ if (tls == null) {
+ continue;
+ }
+ // ignore entries for tables that do not exist in zookeeper
+ if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
+ continue;
+
+ // Don't overwhelm the tablet servers with work
+ if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+ assignments.clear();
+ assigned.clear();
+ assignedToDeadServers.clear();
+ unassigned.clear();
+ unloaded = 0;
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ }
+ Text tableId = tls.extent.getTableId();
+ MergeStats mergeStats = mergeStatsCache.get(tableId);
+ if (mergeStats == null) {
+ mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
+ }
+ TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
+ TServerInstance server = tls.getServer();
+ TabletState state = tls.getState(currentTServers.keySet());
+ stats.update(tableId, state);
+ mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
+ sendChopRequest(mergeStats.getMergeInfo(), state, tls);
+ sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
+
+ // Always follow through with assignments
+ if (state == TabletState.ASSIGNED) {
+ goal = TabletGoalState.HOSTED;
+ }
+
+ // if we are shutting down all the tabletservers, we have to do it in order
+ if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
+ if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
+ if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
+ goal = TabletGoalState.HOSTED;
+ }
+ }
+ }
+
+ if (goal == TabletGoalState.HOSTED) {
+ if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
+ if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
+ continue;
+ }
+ switch (state) {
+ case HOSTED:
+ if (server.equals(this.master.migrations.get(tls.extent)))
+ this.master.migrations.remove(tls.extent);
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ assignedToDeadServers.add(tls);
+ if (server.equals(this.master.migrations.get(tls.extent)))
+ this.master.migrations.remove(tls.extent);
+ // log.info("Current servers " + currentTServers.keySet());
+ break;
+ case UNASSIGNED:
+ // maybe it's a finishing migration
+ TServerInstance dest = this.master.migrations.get(tls.extent);
+ if (dest != null) {
+ // if destination is still good, assign it
+ if (destinations.keySet().contains(dest)) {
+ assignments.add(new Assignment(tls.extent, dest));
+ } else {
+ // get rid of this migration
+ this.master.migrations.remove(tls.extent);
+ unassigned.put(tls.extent, server);
+ }
+ } else {
+ unassigned.put(tls.extent, server);
+ }
+ break;
+ case ASSIGNED:
+ // Send another reminder
+ assigned.add(new Assignment(tls.extent, tls.future));
+ break;
+ }
+ } else {
+ switch (state) {
+ case UNASSIGNED:
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ assignedToDeadServers.add(tls);
+ // log.info("Current servers " + currentTServers.keySet());
+ break;
+ case HOSTED:
+ TServerConnection conn = this.master.tserverSet.getConnection(server);
+ if (conn != null) {
+ conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
+ unloaded++;
+ totalUnloaded++;
+ } else {
+ Master.log.warn("Could not connect to server " + server);
+ }
+ break;
+ case ASSIGNED:
+ break;
+ }
+ }
+ counts[state.ordinal()]++;
+ }
+
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+
+ // provide stats after flushing changes to avoid race conditions w/ delete table
+ stats.end();
+
+ // Report changes
+ for (TabletState state : TabletState.values()) {
+ int i = state.ordinal();
+ if (counts[i] > 0 && counts[i] != oldCounts[i]) {
+ this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
+ }
+ }
+ Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
+ oldCounts = counts;
+ if (totalUnloaded > 0) {
+ this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
+ }
+
+ updateMergeState(mergeStatsCache);
+
+ Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ } catch (Exception ex) {
+ Master.log.error("Error processing table state for store " + store.name(), ex);
+ UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
+ }
+ }
+ }
+
+ private int assignedOrHosted() {
+ int result = 0;
+ for (TableCounts counts : stats.getLast().values()) {
+ result += counts.assigned() + counts.hosted();
+ }
+ return result;
+ }
+
+ private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+ // Already split?
+ if (!info.getState().equals(MergeState.SPLITTING))
+ return;
+ // Merges don't split
+ if (!info.isDelete())
+ return;
+ // Online and ready to split?
+ if (!state.equals(TabletState.HOSTED))
+ return;
+ // Does this extent cover the end points of the delete?
+ KeyExtent range = info.getExtent();
+ if (tls.extent.overlaps(range)) {
+ for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
+ if (splitPoint == null)
+ continue;
+ if (!tls.extent.contains(splitPoint))
+ continue;
+ if (splitPoint.equals(tls.extent.getEndRow()))
+ continue;
+ if (splitPoint.equals(tls.extent.getPrevEndRow()))
+ continue;
+ try {
+ TServerConnection conn;
+ conn = this.master.tserverSet.getConnection(tls.current);
+ if (conn != null) {
+ Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
+ conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
+ } else {
+ Master.log.warn("Not connected to server " + tls.current);
+ }
+ } catch (NotServingTabletException e) {
+ Master.log.debug("Error asking tablet server to split a tablet: " + e);
+ } catch (Exception e) {
+ Master.log.warn("Error asking tablet server to split a tablet: " + e);
+ }
+ }
+ }
+ }
+
+ private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+ // Don't bother if we're in the wrong state
+ if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
+ return;
+ // Tablet must be online
+ if (!state.equals(TabletState.HOSTED))
+ return;
+ // Tablet isn't already chopped
+ if (tls.chopped)
+ return;
+ // Tablet ranges intersect
+ if (info.needsToBeChopped(tls.extent)) {
+ TServerConnection conn;
+ try {
+ conn = this.master.tserverSet.getConnection(tls.current);
+ if (conn != null) {
+ Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
+ conn.chop(this.master.masterLock, tls.extent);
+ } else {
+ Master.log.warn("Could not connect to server " + tls.current);
+ }
+ } catch (TException e) {
+ Master.log.warn("Communications error asking tablet server to chop a tablet");
+ }
+ }
+ }
+
+ private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
+ for (MergeStats stats : mergeStatsCache.values()) {
+ try {
+ MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
+ // when next state is MERGING, its important to persist this before
+ // starting the merge... the verification check that is done before
+ // moving into the merging state could fail if merge starts but does
+ // not finish
+ if (update == MergeState.COMPLETE)
+ update = MergeState.NONE;
+ if (update != stats.getMergeInfo().getState()) {
+ this.master.setMergeState(stats.getMergeInfo(), update);
+ }
+
+ if (update == MergeState.MERGING) {
+ try {
+ if (stats.getMergeInfo().isDelete()) {
+ deleteTablets(stats.getMergeInfo());
+ } else {
+ mergeMetadataRecords(stats.getMergeInfo());
+ }
+ this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
+ } catch (Exception ex) {
+ Master.log.error("Unable merge metadata table records", ex);
+ }
+ }
+ } catch (Exception ex) {
+ Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
+ }
+ }
+ }
+
+ private void deleteTablets(MergeInfo info) throws AccumuloException {
+ KeyExtent extent = info.getExtent();
+ String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME;
+ Master.log.debug("Deleting tablets for " + extent);
+ char timeType = '\0';
+ KeyExtent followingTablet = null;
+ if (extent.getEndRow() != null) {
+ Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW);
+ followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow()));
+ Master.log.debug("Found following tablet " + followingTablet);
+ }
+ try {
+ Connector conn = this.master.getConnector();
+ Text start = extent.getPrevEndRow();
+ if (start == null) {
+ start = new Text();
+ }
+ Master.log.debug("Making file deletion entries for " + extent);
+ Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(),
+ extent.getEndRow()), true);
+ Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+ scanner.setRange(deleteRange);
+ MetadataTable.DIRECTORY_COLUMN.fetch(scanner);
+ MetadataTable.TIME_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
+ Set<FileRef> datafiles = new TreeSet<FileRef>();
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ if (key.compareColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY) == 0) {
+ datafiles.add(new FileRef(this.master.fs, key));
+ if (datafiles.size() > 1000) {
+ MetadataTable.addDeleteEntries(extent, datafiles, SecurityConstants.getSystemCredentials());
+ datafiles.clear();
+ }
+ } else if (MetadataTable.TIME_COLUMN.hasColumns(key)) {
+ timeType = entry.getValue().toString().charAt(0);
+ } else if (key.compareColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
+ throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
+ } else if (MetadataTable.DIRECTORY_COLUMN.hasColumns(key)) {
+ datafiles.add(new FileRef(this.master.fs, key));
+ if (datafiles.size() > 1000) {
+ MetadataTable.addDeleteEntries(extent, datafiles, SecurityConstants.getSystemCredentials());
+ datafiles.clear();
+ }
+ }
+ }
+ MetadataTable.addDeleteEntries(extent, datafiles, SecurityConstants.getSystemCredentials());
+ BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+ try {
+ deleteTablets(info, deleteRange, bw, conn);
+ } finally {
+ bw.close();
+ }
+
+ if (followingTablet != null) {
+ Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow());
+ bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+ try {
+ Mutation m = new Mutation(followingTablet.getMetadataEntry());
+ MetadataTable.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow()));
+ MetadataTable.CHOPPED_COLUMN.putDelete(m);
+ bw.addMutation(m);
+ bw.flush();
+ } finally {
+ bw.close();
+ }
+ } else {
+ // Recreate the default tablet to hold the end of the table
+ Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow());
+ MetadataTable.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), Constants.DEFAULT_TABLET_LOCATION,
+ SecurityConstants.getSystemCredentials(), timeType, this.master.masterLock);
+ }
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
+
+ private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
+ KeyExtent range = info.getExtent();
+ Master.log.debug("Merging metadata for " + range);
+ KeyExtent stop = getHighTablet(range);
+ Master.log.debug("Highest tablet is " + stop);
+ Value firstPrevRowValue = null;
+ Text stopRow = stop.getMetadataEntry();
+ Text start = range.getPrevEndRow();
+ if (start == null) {
+ start = new Text();
+ }
+ Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
+ String targetSystemTable = MetadataTable.NAME;
+ if (range.isMeta()) {
+ targetSystemTable = RootTable.NAME;
+ }
+
+ BatchWriter bw = null;
+ try {
+ long fileCount = 0;
+ Connector conn = this.master.getConnector();
+ // Make file entries in highest tablet
+ bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+ Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+ scanner.setRange(scanRange);
+ MetadataTable.PREV_ROW_COLUMN.fetch(scanner);
+ MetadataTable.TIME_COLUMN.fetch(scanner);
+ MetadataTable.DIRECTORY_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
+ Mutation m = new Mutation(stopRow);
+ String maxLogicalTime = null;
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ Value value = entry.getValue();
+ if (key.getColumnFamily().equals(MetadataTable.DATAFILE_COLUMN_FAMILY)) {
+ m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
+ fileCount++;
+ } else if (MetadataTable.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
+ Master.log.debug("prevRow entry for lowest tablet is " + value);
+ firstPrevRowValue = new Value(value);
+ } else if (MetadataTable.TIME_COLUMN.hasColumns(key)) {
+ maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
+ } else if (MetadataTable.DIRECTORY_COLUMN.hasColumns(key)) {
+ bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
+ }
+ }
+
+ // read the logical time from the last tablet in the merge range, it is not included in
+ // the loop above
+ scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+ scanner.setRange(new Range(stopRow));
+ MetadataTable.TIME_COLUMN.fetch(scanner);
+ for (Entry<Key,Value> entry : scanner) {
+ if (MetadataTable.TIME_COLUMN.hasColumns(entry.getKey())) {
+ maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
+ }
+ }
+
+ if (maxLogicalTime != null)
+ MetadataTable.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
+
+ if (!m.getUpdates().isEmpty()) {
+ bw.addMutation(m);
+ }
+
+ bw.flush();
+
+ Master.log.debug("Moved " + fileCount + " files to " + stop);
+
+ if (firstPrevRowValue == null) {
+ Master.log.debug("tablet already merged");
+ return;
+ }
+
+ stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
+ Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
+ Master.log.debug("Setting the prevRow for last tablet: " + stop);
+ bw.addMutation(updatePrevRow);
+ bw.flush();
+
+ deleteTablets(info, scanRange, bw, conn);
+
+ // Clean-up the last chopped marker
+ m = new Mutation(stopRow);
+ MetadataTable.CHOPPED_COLUMN.putDelete(m);
+ bw.addMutation(m);
+ bw.flush();
+
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ } finally {
+ if (bw != null)
+ try {
+ bw.close();
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
+ }
+
+ private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
+ Scanner scanner;
+ Mutation m;
+ // Delete everything in the other tablets
+ // group all deletes into tablet into one mutation, this makes tablets
+ // either disappear entirely or not all.. this is important for the case
+ // where the process terminates in the loop below...
+ scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+ Master.log.debug("Deleting range " + scanRange);
+ scanner.setRange(scanRange);
+ RowIterator rowIter = new RowIterator(scanner);
+ while (rowIter.hasNext()) {
+ Iterator<Entry<Key,Value>> row = rowIter.next();
+ m = null;
+ while (row.hasNext()) {
+ Entry<Key,Value> entry = row.next();
+ Key key = entry.getKey();
+
+ if (m == null)
+ m = new Mutation(key.getRow());
+
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+ Master.log.debug("deleting entry " + key);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.flush();
+ }
+
+ private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
+ try {
+ Connector conn = this.master.getConnector();
+ Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+ MetadataTable.PREV_ROW_COLUMN.fetch(scanner);
+ KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
+ scanner.setRange(new Range(start.getMetadataEntry(), null));
+ Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+ if (!iterator.hasNext()) {
+ throw new AccumuloException("No last tablet for a merge " + range);
+ }
+ Entry<Key,Value> entry = iterator.next();
+ KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
+ if (highTablet.getTableId() != range.getTableId()) {
+ throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
+ }
+ return highTablet;
+ } catch (Exception ex) {
+ throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
+ }
+ }
+
+ private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
+ List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+ if (!assignedToDeadServers.isEmpty()) {
+ int maxServersToShow = min(assignedToDeadServers.size(), 100);
+ Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
+ store.unassign(assignedToDeadServers);
+ this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
+ }
+
+ if (!currentTServers.isEmpty()) {
+ Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
+ this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
+ for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
+ if (unassigned.containsKey(assignment.getKey())) {
+ if (assignment.getValue() != null) {
+ Master.log.debug(store.name() + " assigning tablet " + assignment);
+ assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
+ }
+ } else {
+ Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
+ }
+ }
+ if (!unassigned.isEmpty() && assignedOut.isEmpty())
+ Master.log.warn("Load balancer failed to assign any tablets");
+ }
+
+ if (assignments.size() > 0) {
+ Master.log.info(String.format("Assigning %d tablets", assignments.size()));
+ store.setFutureLocations(assignments);
+ }
+ assignments.addAll(assigned);
+ for (Assignment a : assignments) {
+ TServerConnection conn = this.master.tserverSet.getConnection(a.server);
+ if (conn != null) {
+ conn.assignTablet(this.master.masterLock, a.tablet);
+ } else {
+ Master.log.warn("Could not connect to server " + a.server);
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java Mon Jun 24 19:29:39 2013
@@ -28,7 +28,6 @@ import org.apache.hadoop.io.Writable;
*
* Writable to serialize for zookeeper and the Tablet
*/
-
public class MergeInfo implements Writable {
public enum Operation {
@@ -36,28 +35,28 @@ public class MergeInfo implements Writab
}
MergeState state = MergeState.NONE;
- KeyExtent range;
+ KeyExtent extent;
Operation operation = Operation.MERGE;
public MergeInfo() {}
@Override
public void readFields(DataInput in) throws IOException {
- range = new KeyExtent();
- range.readFields(in);
+ extent = new KeyExtent();
+ extent.readFields(in);
state = MergeState.values()[in.readInt()];
operation = Operation.values()[in.readInt()];
}
@Override
public void write(DataOutput out) throws IOException {
- range.write(out);
+ extent.write(out);
out.writeInt(state.ordinal());
out.writeInt(operation.ordinal());
}
public MergeInfo(KeyExtent range, Operation op) {
- this.range = range;
+ this.extent = range;
this.operation = op;
}
@@ -65,8 +64,8 @@ public class MergeInfo implements Writab
return state;
}
- public KeyExtent getRange() {
- return range;
+ public KeyExtent getExtent() {
+ return extent;
}
public Operation getOperation() {
@@ -81,27 +80,28 @@ public class MergeInfo implements Writab
return this.operation.equals(Operation.DELETE);
}
- public boolean needsToBeChopped(KeyExtent extent) {
+ public boolean needsToBeChopped(KeyExtent otherExtent) {
// During a delete, the block after the merge will be stretched to cover the deleted area.
// Therefore, it needs to be chopped
- if (!extent.getTableId().equals(range.getTableId()))
+ if (!otherExtent.getTableId().equals(extent.getTableId()))
return false;
if (isDelete())
- return extent.getPrevEndRow() != null && extent.getPrevEndRow().equals(range.getEndRow());
+ return otherExtent.getPrevEndRow() != null && otherExtent.getPrevEndRow().equals(extent.getEndRow());
else
- return this.range.overlaps(extent);
+ return this.extent.overlaps(otherExtent);
}
- public boolean overlaps(KeyExtent extent) {
- boolean result = this.range.overlaps(extent);
- if (!result && needsToBeChopped(extent))
+ public boolean overlaps(KeyExtent otherExtent) {
+ boolean result = this.extent.overlaps(otherExtent);
+ if (!result && needsToBeChopped(otherExtent))
return true;
return result;
}
+ @Override
public String toString() {
if (!state.equals(MergeState.NONE))
- return "Merge " + operation.toString() + " of " + range + " State: " + state;
+ return "Merge " + operation.toString() + " of " + extent + " State: " + state;
return "No Merge in progress";
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java Mon Jun 24 19:29:39 2013
@@ -26,7 +26,6 @@ import org.apache.accumulo.core.client.S
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -56,9 +55,9 @@ public class MergeStats {
this.info = info;
if (info.getState().equals(MergeState.NONE))
return;
- if (info.getRange().getEndRow() == null)
+ if (info.getExtent().getEndRow() == null)
upperSplit = true;
- if (info.getRange().getPrevEndRow() == null)
+ if (info.getExtent().getPrevEndRow() == null)
lowerSplit = true;
}
@@ -71,11 +70,11 @@ public class MergeStats {
return;
if (info.getState().equals(MergeState.NONE))
return;
- if (!upperSplit && info.getRange().getEndRow().equals(ke.getPrevEndRow())) {
+ if (!upperSplit && info.getExtent().getEndRow().equals(ke.getPrevEndRow())) {
log.info("Upper split found");
upperSplit = true;
}
- if (!lowerSplit && info.getRange().getPrevEndRow().equals(ke.getEndRow())) {
+ if (!lowerSplit && info.getExtent().getPrevEndRow().equals(ke.getEndRow())) {
log.info("Lower split found");
lowerSplit = true;
}
@@ -103,79 +102,79 @@ public class MergeStats {
if (state == MergeState.NONE)
return state;
if (total == 0) {
- log.trace("failed to see any tablets for this range, ignoring " + info.getRange());
+ log.trace("failed to see any tablets for this range, ignoring " + info.getExtent());
return state;
}
- log.info("Computing next merge state for " + info.getRange() + " which is presently " + state + " isDelete : " + info.isDelete());
+ log.info("Computing next merge state for " + info.getExtent() + " which is presently " + state + " isDelete : " + info.isDelete());
if (state == MergeState.STARTED) {
state = MergeState.SPLITTING;
}
if (state == MergeState.SPLITTING) {
log.info(hosted + " are hosted, total " + total);
if (!info.isDelete() && total == 1) {
- log.info("Merge range is already contained in a single tablet " + info.getRange());
+ log.info("Merge range is already contained in a single tablet " + info.getExtent());
state = MergeState.COMPLETE;
} else if (hosted == total) {
if (info.isDelete()) {
if (!lowerSplit)
- log.info("Waiting for " + info + " lower split to occur " + info.getRange());
+ log.info("Waiting for " + info + " lower split to occur " + info.getExtent());
else if (!upperSplit)
- log.info("Waiting for " + info + " upper split to occur " + info.getRange());
+ log.info("Waiting for " + info + " upper split to occur " + info.getExtent());
else
state = MergeState.WAITING_FOR_CHOPPED;
} else {
state = MergeState.WAITING_FOR_CHOPPED;
}
} else {
- log.info("Waiting for " + hosted + " hosted tablets to be " + total + " " + info.getRange());
+ log.info("Waiting for " + hosted + " hosted tablets to be " + total + " " + info.getExtent());
}
}
if (state == MergeState.WAITING_FOR_CHOPPED) {
- log.info(chopped + " tablets are chopped " + info.getRange());
+ log.info(chopped + " tablets are chopped " + info.getExtent());
if (chopped == needsToBeChopped) {
state = MergeState.WAITING_FOR_OFFLINE;
} else {
- log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped + " " + info.getRange());
+ log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped + " " + info.getExtent());
}
}
if (state == MergeState.WAITING_FOR_OFFLINE) {
if (chopped != needsToBeChopped) {
- log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getRange());
+ log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getExtent());
// Perhaps a split occurred after we chopped, but before we went offline: start over
state = MergeState.WAITING_FOR_CHOPPED;
} else {
- log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getRange());
+ log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getExtent());
if (unassigned == total && chopped == needsToBeChopped) {
if (verifyMergeConsistency(connector, master))
state = MergeState.MERGING;
else
- log.info("Merge consistency check failed " + info.getRange());
+ log.info("Merge consistency check failed " + info.getExtent());
} else {
- log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getRange());
+ log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getExtent());
}
}
}
if (state == MergeState.MERGING) {
if (hosted != 0) {
// Shouldn't happen
- log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getRange());
+ log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getExtent());
state = MergeState.WAITING_FOR_OFFLINE;
}
if (unassigned != total) {
// Shouldn't happen
- log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getRange());
+ log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getExtent());
state = MergeState.WAITING_FOR_CHOPPED;
}
- log.info(unassigned + " tablets are unassigned " + info.getRange());
+ log.info(unassigned + " tablets are unassigned " + info.getExtent());
}
return state;
}
private boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException {
MergeStats verify = new MergeStats(info);
- Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ KeyExtent extent = info.getExtent();
+ Scanner scanner = connector.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
MetaDataTableScanner.configureScanner(scanner, master);
- KeyExtent extent = info.getRange();
Text start = extent.getPrevEndRow();
if (start == null) {
start = new Text();
@@ -183,10 +182,6 @@ public class MergeStats {
Text tableId = extent.getTableId();
Text first = KeyExtent.getMetadataEntry(tableId, start);
Range range = new Range(first, false, null, true);
- if (extent.isMeta()) {
- // don't go off the root tablet
- range = new Range(new Key(first).followingKey(PartialKey.ROW), false, RootTable.KEYSPACE.getEndKey(), false);
- }
scanner.setRange(range);
KeyExtent prevExtent = null;
@@ -255,7 +250,7 @@ public class MergeStats {
in.reset(data, data.length);
info.readFields(in);
}
- System.out.println(String.format("%25s %10s %10s %s", table, info.state, info.operation, info.range));
+ System.out.println(String.format("%25s %10s %10s %s", table, info.state, info.operation, info.extent));
}
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java Mon Jun 24 19:29:39 2013
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.hadoop.io.Text;
@@ -43,20 +44,30 @@ public class MetaDataStateStore extends
final protected Instance instance;
final protected CurrentState state;
final protected TCredentials auths;
+ final private String targetTableName;
- public MetaDataStateStore(Instance instance, TCredentials auths, CurrentState state) {
+ protected MetaDataStateStore(Instance instance, TCredentials auths, CurrentState state, String targetTableName) {
this.instance = instance;
this.state = state;
this.auths = auths;
+ this.targetTableName = targetTableName;
+ }
+
+ public MetaDataStateStore(Instance instance, TCredentials auths, CurrentState state) {
+ this(instance, auths, state, MetadataTable.NAME);
+ }
+
+ protected MetaDataStateStore(String tableName) {
+ this(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), null, tableName);
}
public MetaDataStateStore() {
- this(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), null);
+ this(MetadataTable.NAME);
}
-
+
@Override
public Iterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(instance, auths, MetadataTable.NON_ROOT_KEYSPACE, state);
+ return new MetaDataTableScanner(instance, auths, RootTable.METADATA_TABLETS_RANGE, state);
}
@Override
@@ -83,7 +94,7 @@ public class MetaDataStateStore extends
BatchWriter createBatchWriter() {
try {
- return instance.getConnector(auths.getPrincipal(), CredentialHelper.extractToken(auths)).createBatchWriter(MetadataTable.NAME,
+ return instance.getConnector(auths.getPrincipal(), CredentialHelper.extractToken(auths)).createBatchWriter(targetTableName,
new BatchWriterConfig().setMaxMemory(MAX_MEMORY).setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
} catch (TableNotFoundException e) {
// ya, I don't think so
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java Mon Jun 24 19:29:39 2013
@@ -51,10 +51,14 @@ public class MetaDataTableScanner implem
Iterator<Entry<Key,Value>> iter;
public MetaDataTableScanner(Instance instance, TCredentials auths, Range range, CurrentState state) {
+ this(instance, auths, range, state, MetadataTable.NAME);
+ }
+
+ MetaDataTableScanner(Instance instance, TCredentials auths, Range range, CurrentState state, String tableName) {
// scan over metadata table, looking for tablets in the wrong state based on the live servers and online tables
try {
Connector connector = instance.getConnector(auths.getPrincipal(), CredentialHelper.extractToken(auths));
- mdScanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 8);
+ mdScanner = connector.createBatchScanner(tableName, Authorizations.EMPTY, 8);
configureScanner(mdScanner, state);
mdScanner.setRanges(Collections.singletonList(range));
iter = mdScanner.iterator();
@@ -81,7 +85,11 @@ public class MetaDataTableScanner implem
}
public MetaDataTableScanner(Instance instance, TCredentials auths, Range range) {
- this(instance, auths, range, null);
+ this(instance, auths, range, MetadataTable.NAME);
+ }
+
+ public MetaDataTableScanner(Instance instance, TCredentials auths, Range range, String tableName) {
+ this(instance, auths, range, null, tableName);
}
public void close() {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java Mon Jun 24 19:29:39 2013
@@ -25,12 +25,16 @@ import org.apache.accumulo.core.util.Roo
public class RootTabletStateStore extends MetaDataStateStore {
public RootTabletStateStore(Instance instance, TCredentials auths, CurrentState state) {
- super(instance, auths, state);
+ super(instance, auths, state, RootTable.NAME);
+ }
+
+ public RootTabletStateStore() {
+ super(RootTable.NAME);
}
@Override
public Iterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(instance, auths, RootTable.KEYSPACE, state);
+ return new MetaDataTableScanner(instance, auths, RootTable.METADATA_TABLETS_RANGE, state, RootTable.NAME);
}
@Override
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java Mon Jun 24 19:29:39 2013
@@ -98,7 +98,7 @@ public class TabletStateChangeIterator e
while (buffer.available() > 0) {
MergeInfo mergeInfo = new MergeInfo();
mergeInfo.readFields(buffer);
- result.put(mergeInfo.range.getTableId(), mergeInfo);
+ result.put(mergeInfo.extent.getTableId(), mergeInfo);
}
return result;
} catch (Exception ex) {
@@ -126,7 +126,7 @@ public class TabletStateChangeIterator e
}
// we always want data about merges
MergeInfo merge = merges.get(tls.extent.getTableId());
- if (merge != null && merge.getRange() != null && merge.getRange().overlaps(tls.extent)) {
+ if (merge != null && merge.getExtent() != null && merge.getExtent().overlaps(tls.extent)) {
return;
}
// is the table supposed to be online or offline?
@@ -173,7 +173,7 @@ public class TabletStateChangeIterator e
DataOutputBuffer buffer = new DataOutputBuffer();
try {
for (MergeInfo info : merges) {
- KeyExtent extent = info.getRange();
+ KeyExtent extent = info.getExtent();
if (extent != null && !info.getState().equals(MergeState.NONE)) {
info.write(buffer);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java Mon Jun 24 19:29:39 2013
@@ -37,6 +37,7 @@ public abstract class TabletStateStore i
/**
* Scan the information about the tablets covered by this store
*/
+ @Override
abstract public Iterator<TabletLocationState> iterator();
/**
@@ -68,6 +69,8 @@ public abstract class TabletStateStore i
TabletStateStore store;
if (tls.extent.isRootTablet()) {
store = new ZooTabletStateStore();
+ } else if (tls.extent.isMeta()) {
+ store = new RootTabletStateStore();
} else {
store = new MetaDataStateStore();
}
@@ -78,6 +81,8 @@ public abstract class TabletStateStore i
TabletStateStore store;
if (assignment.tablet.isRootTablet()) {
store = new ZooTabletStateStore();
+ } else if (assignment.tablet.isMeta()) {
+ store = new RootTabletStateStore();
} else {
store = new MetaDataStateStore();
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java Mon Jun 24 19:29:39 2013
@@ -88,7 +88,7 @@ public class ZooTabletStateStore extends
log.debug("root tablet logSet " + logEntry.logSet);
}
}
- TabletLocationState result = new TabletLocationState(RootTable.ROOT_TABLET_EXTENT, futureSession, currentSession, lastSession, logs, false);
+ TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
log.debug("Returning root tablet state: " + result);
return result;
} catch (Exception ex) {
@@ -120,7 +120,7 @@ public class ZooTabletStateStore extends
if (assignments.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
Assignment assignment = assignments.iterator().next();
- if (assignment.tablet.compareTo(RootTable.ROOT_TABLET_EXTENT) != 0)
+ if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
String value = AddressUtil.toString(assignment.server.getLocation()) + "|" + assignment.server.getSession();
Iterator<TabletLocationState> currentIter = iterator();
@@ -136,7 +136,7 @@ public class ZooTabletStateStore extends
if (assignments.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
Assignment assignment = assignments.iterator().next();
- if (assignment.tablet.compareTo(RootTable.ROOT_TABLET_EXTENT) != 0)
+ if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
String value = AddressUtil.toString(assignment.server.getLocation()) + "|" + assignment.server.getSession();
Iterator<TabletLocationState> currentIter = iterator();
@@ -159,7 +159,7 @@ public class ZooTabletStateStore extends
if (tablets.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
TabletLocationState tls = tablets.iterator().next();
- if (tls.extent.compareTo(RootTable.ROOT_TABLET_EXTENT) != 0)
+ if (tls.extent.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
store.remove(RootTable.ZROOT_TABLET_LOCATION);
store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java Mon Jun 24 19:29:39 2013
@@ -98,7 +98,7 @@ class CompactionDriver extends MasterRep
Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange();
if (tableId.equals(MetadataTable.ID))
- range = range.clip(new Range(RootTable.ROOT_TABLET_EXTENT.getMetadataEntry(), false, null, true));
+ range = range.clip(new Range(RootTable.EXTENT.getMetadataEntry(), false, null, true));
scanner.setRange(range);
MetadataTable.COMPACT_COLUMN.fetch(scanner);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java Mon Jun 24 19:29:39 2013
@@ -16,28 +16,16 @@
*/
package org.apache.accumulo.server.master.tableOps;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
-import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.MergeInfo;
import org.apache.accumulo.server.master.state.MergeInfo.Operation;
import org.apache.accumulo.server.master.state.MergeState;
-import org.apache.accumulo.server.util.MetadataTable;
import org.apache.hadoop.io.Text;
/**
@@ -53,29 +41,6 @@ import org.apache.hadoop.io.Text;
* The code below uses read-write lock to prevent some operations while a merge is taking place. Normal operations, like bulk imports, will grab the read lock
* and prevent merges (writes) while they run. Merge operations will lock out some operations while they run.
*/
-
-class MakeDeleteEntries extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- log.info("creating delete entries for merged metadata tablets");
- Connector conn = master.getConnector();
- Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.setRange(RootTable.KEYSPACE);
- scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- for (Entry<Key,Value> entry : scanner) {
- // TODO: add the entries only if there are no !METADATA table references - ACCUMULO-1308
- FileRef ref = new FileRef(master.getFileSystem(), entry.getKey());
- bw.addMutation(MetadataTable.createDeleteMutation(MetadataTable.ID, ref.path().toString()));
- }
- bw.close();
- return null;
- }
-}
-
class TableRangeOpWait extends MasterRepo {
private static final long serialVersionUID = 1L;
@@ -101,13 +66,6 @@ class TableRangeOpWait extends MasterRep
log.info("removing merge information " + mergeInfo);
master.clearMergeState(tableIdText);
Utils.unreserveTable(tableId, tid, true);
- // We can't add entries to the metadata table if it is offline for this merge.
- // If the delete entries for the metadata table were in the root tablet, it would work just fine
- // but all the delete entries go into the end of the metadata table. Work around: add the
- // delete entries after the merge completes.
- if (mergeInfo.getOperation().equals(Operation.MERGE) && tableId.equals(MetadataTable.ID)) {
- return new MakeDeleteEntries();
- }
return null;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java Mon Jun 24 19:29:39 2013
@@ -33,6 +33,8 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -144,23 +146,27 @@ public class TablesServlet extends Basic
private void doTableDetails(HttpServletRequest req, StringBuilder sb, Map<String,String> tidToNameMap, String tableId) {
String displayName = Tables.getPrintableTableNameFromId(tidToNameMap, tableId);
Instance instance = HdfsZooInstance.getInstance();
- MetaDataTableScanner scanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), new Range(KeyExtent.getMetadataEntry(new Text(
- tableId), new Text()),
- KeyExtent.getMetadataEntry(
- new Text(tableId), null)));
-
TreeSet<String> locs = new TreeSet<String>();
- while (scanner.hasNext()) {
- TabletLocationState state = scanner.next();
- if (state.current != null) {
- try {
- locs.add(state.current.hostPort());
- } catch (Exception ex) {
- log.error(ex, ex);
+ if (RootTable.ID.equals(tableId)) {
+ locs.add(instance.getRootTabletLocation());
+ } else {
+ String systemTableName = MetadataTable.ID.equals(tableId) ? RootTable.NAME : MetadataTable.NAME;
+ MetaDataTableScanner scanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), new Range(KeyExtent.getMetadataEntry(
+ new Text(tableId), new Text()), KeyExtent.getMetadataEntry(new Text(tableId), null)), systemTableName);
+
+ while (scanner.hasNext()) {
+ TabletLocationState state = scanner.next();
+ if (state.current != null) {
+ try {
+ locs.add(state.current.hostPort());
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
}
}
+ scanner.close();
}
- scanner.close();
+
log.debug("Locs: " + locs);
List<TabletServerStatus> tservers = new ArrayList<TabletServerStatus>();
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Mon Jun 24 19:29:39 2013
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -85,7 +86,7 @@ public class ProblemReports implements I
log.debug("Filing problem report " + pr.getTableName() + " " + pr.getProblemType() + " " + pr.getResource());
try {
- if (pr.getTableName().equals(MetadataTable.ID)) {
+ if (pr.getTableName().equals(MetadataTable.ID) || pr.getTableName().equals(RootTable.ID)) {
// file report in zookeeper
pr.saveToZooKeeper();
} else {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java Mon Jun 24 19:29:39 2013
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.master.Master;
@@ -255,7 +256,7 @@ public class SecurityOperation {
targetUserExists(user);
- if (table.equals(MetadataTable.ID) && permission.equals(TablePermission.READ))
+ if ((table.equals(MetadataTable.ID) || table.equals(RootTable.ID)) && permission.equals(TablePermission.READ))
return true;
try {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java Mon Jun 24 19:29:39 2013
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -58,6 +59,7 @@ public class ZKPermHandler implements Pe
return zkPermHandlerInstance;
}
+ @Override
public void initialize(String instanceId, boolean initialize) {
ZKUserPath = ZKSecurityTool.getInstancePath(instanceId) + "/users";
ZKTablePath = ZKSecurityTool.getInstancePath(instanceId) + "/tables";
@@ -251,7 +253,8 @@ public class ZKPermHandler implements Pe
for (SystemPermission p : SystemPermission.values())
rootPerms.add(p);
Map<String,Set<TablePermission>> tablePerms = new HashMap<String,Set<TablePermission>>();
- // Allow the root user to flush the !METADATA table
+ // Allow the root user to flush the system tables
+ tablePerms.put(RootTable.ID, Collections.singleton(TablePermission.ALTER_TABLE));
tablePerms.put(MetadataTable.ID, Collections.singleton(TablePermission.ALTER_TABLE));
try {
@@ -276,6 +279,7 @@ public class ZKPermHandler implements Pe
* @param user
* @throws AccumuloSecurityException
*/
+ @Override
public void initUser(String user) throws AccumuloSecurityException {
IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
try {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Mon Jun 24 19:29:39 2013
@@ -86,6 +86,7 @@ import org.apache.accumulo.core.util.Cac
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.ServerConstants;
@@ -683,7 +684,7 @@ public class Tablet {
}
// Remove any bulk files we've previously loaded and compacted away
List<FileRef> files = MetadataTable.getBulkFilesLoaded(conn, extent, tid);
-
+
for (FileRef file : files)
if (paths.keySet().remove(file.path()))
log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
@@ -777,7 +778,8 @@ public class Tablet {
mergingMinorCompactionFile = null;
}
- void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) throws IOException {
+ void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId)
+ throws IOException {
IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
if (extent.isRootTablet()) {
@@ -846,8 +848,8 @@ public class Tablet {
persistedTime = commitSession.getMaxCommittedTime();
String time = tabletTime.getMetadataValue(persistedTime);
- MetadataTable.updateTabletDataFile(extent, newDatafile, absMergeFile, dfv, time, creds, filesInUseByScans,
- tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
+ MetadataTable.updateTabletDataFile(extent, newDatafile, absMergeFile, dfv, time, creds, filesInUseByScans, tabletServer.getClientAddressString(),
+ tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
}
} finally {
@@ -923,7 +925,8 @@ public class Tablet {
majorCompactingFiles.clear();
}
- void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv) throws IOException {
+ void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
+ throws IOException {
long t1, t2;
if (!extent.isRootTablet()) {
@@ -1029,8 +1032,8 @@ public class Tablet {
Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
if (filesInUseByScans.size() > 0)
log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
- MetadataTable.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv,
- SecurityConstants.getSystemCredentials(), tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock());
+ MetadataTable.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SecurityConstants.getSystemCredentials(),
+ tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock());
removeFilesAfterScan(filesInUseByScans);
}
@@ -1068,16 +1071,14 @@ public class Tablet {
private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
SortedMap<Key,Value> tabletsKeyValues) throws IOException {
- this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(),
- tabletsKeyValues);
+ this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), tabletsKeyValues);
}
static private final List<LogEntry> EMPTY = Collections.emptyList();
private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
SortedMap<FileRef,DataFileValue> datafiles, String time, long initFlushID, long initCompactID) throws IOException {
- this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), EMPTY,
- datafiles, time, null, new HashSet<FileRef>(), initFlushID, initCompactID);
+ this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new HashSet<FileRef>(), initFlushID, initCompactID);
}
private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
@@ -1124,7 +1125,8 @@ public class Tablet {
Text rowName = extent.getMetadataEntry();
- ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), MetadataTable.ID, Authorizations.EMPTY);
+ String tableId = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
+ ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), tableId, Authorizations.EMPTY);
// Commented out because when no data file is present, each tablet will scan through metadata table and return nothing
// reduced batch size to improve performance
@@ -1233,11 +1235,11 @@ public class Tablet {
* yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time
*/
private Tablet(final TabletServer tabletServer, final Text location, final KeyExtent extent, final TabletResourceManager trm, final Configuration conf,
- final VolumeManager fs, final List<LogEntry> logEntries, final SortedMap<FileRef,DataFileValue> datafiles, String time, final TServerInstance lastLocation,
- Set<FileRef> scanFiles, long initFlushID, long initCompactID) throws IOException {
+ final VolumeManager fs, final List<LogEntry> logEntries, final SortedMap<FileRef,DataFileValue> datafiles, String time,
+ final TServerInstance lastLocation, Set<FileRef> scanFiles, long initFlushID, long initCompactID) throws IOException {
if (location.find(":") >= 0) {
- this.location = new Path(location.toString());
- } else {
+ this.location = new Path(location.toString());
+ } else {
this.location = new Path(ServerConstants.getTablesDirs()[0] + "/" + extent.getTableId().toString() + location.toString());
}
this.location = this.location.makeQualified(fs.getFileSystemByPath(this.location));
@@ -1259,8 +1261,7 @@ public class Tablet {
for (FileRef ref : datafiles.keySet()) {
Path path = ref.path();
FileSystem ns = fs.getFileSystemByPath(path);
- FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(),
- tabletServer.getTableConfiguration(extent));
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), tabletServer.getTableConfiguration(extent));
long maxTime = -1;
try {
@@ -2147,8 +2148,8 @@ public class Tablet {
}
span.stop();
span = Trace.start("compact");
- this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued,
- commitSession, flushId, mincReason);
+ this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId,
+ mincReason);
span.stop();
if (needsSplit()) {
@@ -3277,7 +3278,7 @@ public class Tablet {
});
for (Iterator<Entry<FileRef,Long>> iterator = filesToCompact.entrySet().iterator(); iterator.hasNext();) {
- Entry<FileRef,Long> entry = (Entry<FileRef,Long>) iterator.next();
+ Entry<FileRef,Long> entry = iterator.next();
fileHeap.add(new Pair<FileRef,Long>(entry.getKey(), entry.getValue()));
}
@@ -3459,8 +3460,7 @@ public class Tablet {
// this info is used for optimization... it is ok if map files are missing
// from the set... can still query and insert into the tablet while this
// map file operation is happening
- Map<FileRef,FileUtil.FileInfo> firstAndLastRows = FileUtil.tryToGetFirstAndLastRows(fs,
- tabletServer.getSystemConfiguration(), datafileManager.getFiles());
+ Map<FileRef,FileUtil.FileInfo> firstAndLastRows = FileUtil.tryToGetFirstAndLastRows(fs, tabletServer.getSystemConfiguration(), datafileManager.getFiles());
synchronized (this) {
// java needs tuples ...
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon Jun 24 19:29:39 2013
@@ -881,7 +881,7 @@ public class TabletServer extends Abstra
for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
TKeyExtent tke = entry.getKey();
Map<String,MapFileInfo> fileMap = entry.getValue();
- Map<FileRef, MapFileInfo> fileRefMap = new HashMap<FileRef, MapFileInfo>();
+ Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
Path path = new Path(mapping.getKey());
FileSystem ns = fs.getFileSystemByPath(path);
@@ -2883,11 +2883,14 @@ public class TabletServer extends Abstra
if (extent.isRootTablet()) {
return verifyRootTablet(extent, instance);
}
+ String tableToVerify = MetadataTable.ID;
+ if (extent.isMeta())
+ tableToVerify = RootTable.ID;
List<ColumnFQ> columnsToFetch = Arrays.asList(new ColumnFQ[] {MetadataTable.DIRECTORY_COLUMN, MetadataTable.PREV_ROW_COLUMN,
MetadataTable.SPLIT_RATIO_COLUMN, MetadataTable.OLD_PREV_ROW_COLUMN, MetadataTable.TIME_COLUMN});
- ScannerImpl scanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), MetadataTable.ID, Authorizations.EMPTY);
+ ScannerImpl scanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), tableToVerify, Authorizations.EMPTY);
scanner.setRange(extent.toMetadataRange());
TreeMap<Key,Value> tkv = new TreeMap<Key,Value>();
@@ -3233,7 +3236,6 @@ public class TabletServer extends Abstra
}
}
-
public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
totalMinorCompactions++;
logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3243,7 +3245,8 @@ public class TabletServer extends Abstra
logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation);
}
- public void recover(VolumeManager fs, Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
+ public void recover(VolumeManager fs, Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver)
+ throws IOException {
List<Path> recoveryLogs = new ArrayList<Path>();
List<LogEntry> sorted = new ArrayList<LogEntry>(logEntries);
Collections.sort(sorted, new Comparator<LogEntry>() {
@@ -3473,7 +3476,7 @@ public class TabletServer extends Abstra
}
};
}
-
+
public VolumeManager getFileSystem() {
return fs;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Mon Jun 24 19:29:39 2013
@@ -710,7 +710,7 @@ public class TabletServerResourceManager
}
public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
- if (tablet.equals(RootTable.ROOT_TABLET_EXTENT)) {
+ if (tablet.equals(RootTable.EXTENT)) {
rootMajorCompactionThreadPool.execute(compactionTask);
} else if (tablet.isMeta()) {
defaultMajorCompactionThreadPool.execute(compactionTask);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java Mon Jun 24 19:29:39 2013
@@ -61,8 +61,8 @@ public class AddFilesWithMissingEntries
BatchWriterOpts bwOpts = new BatchWriterOpts();
opts.parseArgs(AddFilesWithMissingEntries.class.getName(), args, bwOpts);
- final Key rootTableEnd = new Key(RootTable.ROOT_TABLET_EXTENT.getEndRow());
- final Range range = new Range(rootTableEnd.followingKey(PartialKey.ROW), true, MetadataTable.RESERVED_KEYSPACE_START_KEY, false);
+ final Key rootTableEnd = new Key(RootTable.EXTENT.getEndRow());
+ final Range range = new Range(rootTableEnd.followingKey(PartialKey.ROW), true, MetadataTable.RESERVED_RANGE_START_KEY, false);
final Scanner scanner = opts.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.setRange(range);
final Configuration conf = new Configuration();
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java?rev=1496171&r1=1496170&r2=1496171&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java Mon Jun 24 19:29:39 2013
@@ -48,7 +48,7 @@ public class FindOfflineTablets {
opts.parseArgs(FindOfflineTablets.class.getName(), args);
final AtomicBoolean scanning = new AtomicBoolean(false);
Instance instance = opts.getInstance();
- MetaDataTableScanner rootScanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), RootTable.KEYSPACE);
+ MetaDataTableScanner rootScanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), RootTable.METADATA_TABLETS_RANGE);
MetaDataTableScanner metaScanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), MetadataTable.NON_ROOT_KEYSPACE);
@SuppressWarnings("unchecked")
Iterator<TabletLocationState> scanner = (Iterator<TabletLocationState>)new IteratorChain(rootScanner, metaScanner);
@@ -67,7 +67,7 @@ public class FindOfflineTablets {
TabletLocationState locationState = scanner.next();
TabletState state = locationState.getState(tservers.getCurrentServers());
if (state != null && state != TabletState.HOSTED && TableManager.getInstance().getTableState(locationState.extent.getTableId().toString()) != TableState.OFFLINE)
- if (!locationState.extent.equals(RootTable.ROOT_TABLET_EXTENT))
+ if (!locationState.extent.equals(RootTable.EXTENT))
System.out.println(locationState + " is " + state + " #walogs:" + locationState.walogs.size());
}
}