You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2020/04/06 14:36:53 UTC
[accumulo] branch master updated: Update Last Location. Solution
for #1169 (#1453)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new d12ff6c Update Last Location. Solution for #1169 (#1453)
d12ff6c is described below
commit d12ff6c700e003202fd2e9842499324561246b15
Author: Jeffrey Manno <je...@gmail.com>
AuthorDate: Mon Apr 6 10:36:44 2020 -0400
Update Last Location. Solution for #1169 (#1453)
* Add suspending tablets to ample and in stateStore Function
* remove functions not used and finalize changes for refactors
* remove collection<assignments> from setFuture
* Deleted more unused code. Various requested changes
Co-authored-by: Jeffrey Manno <je...@gmail.com>
Co-authored-by: Mike Miller <mm...@apache.org>
---
.../accumulo/core/metadata/schema/Ample.java | 4 +
.../master/state/DistributedStoreException.java | 33 -----
.../master/state/LoggingTabletStateStore.java | 21 ++--
.../server/master/state/MetaDataStateStore.java | 133 +++++++--------------
.../server/master/state/SuspendingTServer.java | 11 --
.../server/master/state/TServerInstance.java | 34 ------
.../server/master/state/TabletStateStore.java | 23 ++--
.../server/master/state/ZooTabletStateStore.java | 26 ++--
.../server/metadata/TabletMutatorBase.java | 17 +++
.../accumulo/server/util/MasterMetadataUtil.java | 22 +---
.../master/state/RootTabletStateStoreTest.java | 13 +-
.../apache/accumulo/master/TabletGroupWatcher.java | 7 +-
.../org/apache/accumulo/tserver/TabletServer.java | 5 +-
.../accumulo/tserver/tablet/DatafileManager.java | 4 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 7 +-
.../test/MasterRepairsDualAssignmentIT.java | 24 ++--
.../accumulo/test/functional/SplitRecoveryIT.java | 12 +-
.../apache/accumulo/test/master/MergeStateIT.java | 6 +-
.../accumulo/test/performance/NullTserver.java | 9 +-
19 files changed, 138 insertions(+), 273 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index a7039d4..bd3eb09 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -222,6 +222,10 @@ public interface Ample {
public TabletMutator putChopped();
+ public TabletMutator putSuspension(TServer tserver, long suspensionTime);
+
+ public TabletMutator deleteSuspension();
+
/**
* This method persist (or queues for persisting) previous put and deletes against this object.
* Unless this method is called, previous calls will never be persisted. The purpose of this
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
deleted file mode 100644
index a099bbf..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-public class DistributedStoreException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- public DistributedStoreException(String why) {
- super(why);
- }
-
- public DistributedStoreException(Exception cause) {
- super(cause);
- }
-
-}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java
index 8b5fe1a..4249fc2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java
@@ -48,21 +48,21 @@ class LoggingTabletStateStore implements TabletStateStore {
}
@Override
- public void setFutureLocations(Collection<Assignment> assignments)
- throws DistributedStoreException {
- wrapped.setFutureLocations(assignments);
- assignments.forEach(assignment -> TabletLogger.assigned(assignment.tablet, assignment.server));
+ public void setFutureLocation(Assignment assignment) {
+ wrapped.setFutureLocation(assignment);
+ TabletLogger.assigned(assignment.tablet, assignment.server);
+
}
@Override
- public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
- wrapped.setLocations(assignments);
- assignments.forEach(assignment -> TabletLogger.loaded(assignment.tablet, assignment.server));
+ public void setLocation(Assignment assignment, TServerInstance prevLastLoc) {
+ wrapped.setLocation(assignment, prevLastLoc);
+ TabletLogger.loaded(assignment.tablet, assignment.server);
}
@Override
public void unassign(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers) {
wrapped.unassign(tablets, logsForDeadServers);
if (logsForDeadServers == null)
@@ -75,8 +75,7 @@ class LoggingTabletStateStore implements TabletStateStore {
@Override
public void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
- throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) {
wrapped.suspend(tablets, logsForDeadServers, suspensionTimestamp);
if (logsForDeadServers == null)
@@ -89,7 +88,7 @@ class LoggingTabletStateStore implements TabletStateStore {
}
@Override
- public void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+ public void unsuspend(Collection<TabletLocationState> tablets) {
wrapped.unsuspend(tablets);
for (TabletLocationState tls : tablets) {
TabletLogger.unsuspended(tls.extent);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 6adef3d..ce2fc7f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -21,17 +21,17 @@ package org.apache.accumulo.server.master.state;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class MetaDataStateStore implements TabletStateStore {
@@ -42,10 +42,13 @@ class MetaDataStateStore implements TabletStateStore {
protected final ClientContext context;
protected final CurrentState state;
private final String targetTableName;
+ private final Ample ample;
+ private static final Logger log = LoggerFactory.getLogger(MetaDataStateStore.class);
protected MetaDataStateStore(ClientContext context, CurrentState state, String targetTableName) {
this.context = context;
this.state = state;
+ this.ample = context.getAmple();
this.targetTableName = targetTableName;
}
@@ -60,135 +63,87 @@ class MetaDataStateStore implements TabletStateStore {
}
@Override
- public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
- for (Assignment assignment : assignments) {
- Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
- assignment.server.putLocation(m);
- assignment.server.clearFutureLocation(m);
- SuspendingTServer.clearSuspension(m);
- writer.addMutation(m);
- }
- } catch (Exception ex) {
- throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
- }
- }
+ public void setLocation(Assignment assignment, TServerInstance prevLastLoc) {
- BatchWriter createBatchWriter() {
- try {
- return context.createBatchWriter(targetTableName,
- new BatchWriterConfig().setMaxMemory(MAX_MEMORY)
- .setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
- } catch (Exception e) {
- throw new RuntimeException(e);
+ TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet);
+ tabletMutator.putLocation(assignment.server, LocationType.CURRENT);
+ tabletMutator.putLocation(assignment.server, LocationType.LAST);
+ tabletMutator.deleteLocation(assignment.server, LocationType.FUTURE);
+
+ // remove the old location
+ if (prevLastLoc != null && !prevLastLoc.equals(assignment.server)) {
+ tabletMutator.deleteLocation(prevLastLoc, LocationType.LAST);
}
+
+ tabletMutator.mutate();
+
}
@Override
- public void setFutureLocations(Collection<Assignment> assignments)
- throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
- for (Assignment assignment : assignments) {
- Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
- SuspendingTServer.clearSuspension(m);
- assignment.server.putFutureLocation(m);
- writer.addMutation(m);
- }
- } catch (Exception ex) {
- throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
- }
+ public void setFutureLocation(Assignment assignment) {
+
+ TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet);
+ tabletMutator.deleteSuspension();
+ tabletMutator.putLocation(assignment.server, LocationType.FUTURE);
+ tabletMutator.mutate();
+
}
@Override
public void unassign(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers) {
unassign(tablets, logsForDeadServers, -1);
}
@Override
public void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
- throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) {
unassign(tablets, logsForDeadServers, suspensionTimestamp);
}
private void unassign(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
- throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
+ Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) {
+ try (var tabletsMutator = ample.mutateTablets()) {
for (TabletLocationState tls : tablets) {
- Mutation m = new Mutation(tls.extent.getMetadataEntry());
+ TabletMutator tabletMutator = tabletsMutator.mutateTablet(tls.extent);
if (tls.current != null) {
- tls.current.clearLocation(m);
+ tabletMutator.deleteLocation(tls.current, LocationType.CURRENT);
if (logsForDeadServers != null) {
List<Path> logs = logsForDeadServers.get(tls.current);
if (logs != null) {
for (Path log : logs) {
LogEntry entry =
new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
- m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+ tabletMutator.putWal(entry);
}
}
}
if (suspensionTimestamp >= 0) {
- SuspendingTServer suspender =
- new SuspendingTServer(tls.current.getLocation(), suspensionTimestamp);
- suspender.setSuspension(m);
+ tabletMutator.putSuspension(tls.current, suspensionTimestamp);
}
}
if (tls.suspend != null && suspensionTimestamp < 0) {
- SuspendingTServer.clearSuspension(m);
+ tabletMutator.deleteSuspension();
}
if (tls.future != null) {
- tls.future.clearFutureLocation(m);
+ tabletMutator.deleteLocation(tls.future, LocationType.FUTURE);
}
- writer.addMutation(m);
- }
- } catch (Exception ex) {
- throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
+ tabletMutator.mutate();
}
}
}
@Override
- public void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
+ public void unsuspend(Collection<TabletLocationState> tablets) {
+
+ try (var tabletsMutator = ample.mutateTablets()) {
for (TabletLocationState tls : tablets) {
if (tls.suspend != null) {
continue;
}
- Mutation m = new Mutation(tls.extent.getMetadataEntry());
- SuspendingTServer.clearSuspension(m);
- writer.addMutation(m);
- }
- } catch (Exception ex) {
- throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
+ TabletMutator tabletMutator = tabletsMutator.mutateTablet(tls.extent);
+ tabletMutator.deleteSuspension();
+ tabletMutator.mutate();
}
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
index c6bb343..52de28f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
@@ -18,11 +18,8 @@
*/
package org.apache.accumulo.server.master.state;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN;
-
import java.util.Objects;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.HostAndPort;
@@ -57,14 +54,6 @@ public class SuspendingTServer {
return server.equals(rhs.server) && suspensionTime == rhs.suspensionTime;
}
- public void setSuspension(Mutation m) {
- m.put(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier(), toValue());
- }
-
- public static void clearSuspension(Mutation m) {
- m.putDelete(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier());
- }
-
@Override
public int hashCode() {
return Objects.hash(server, suspensionTime);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index a539643..d5e40c5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -25,10 +25,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.HostAndPort;
@@ -81,30 +79,6 @@ public class TServerInstance implements Ample.TServer, Comparable<TServerInstanc
this(location.getHostAndPort(), location.getSession());
}
- public void putLocation(Mutation m) {
- m.put(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
- }
-
- public void putFutureLocation(Mutation m) {
- m.put(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
- }
-
- public void putLastLocation(Mutation m) {
- m.put(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
- }
-
- public void clearLastLocation(Mutation m) {
- m.putDelete(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier());
- }
-
- public void clearFutureLocation(Mutation m) {
- m.putDelete(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier());
- }
-
- public void clearLocation(Mutation m) {
- m.putDelete(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier());
- }
-
@Override
public int compareTo(TServerInstance other) {
if (this == other)
@@ -138,14 +112,6 @@ public class TServerInstance implements Ample.TServer, Comparable<TServerInstanc
return getLocation().toString();
}
- private Text asColumnQualifier() {
- return new Text(this.getSession());
- }
-
- private Value asMutationValue() {
- return new Value(getLocation().toString());
- }
-
@Override
public HostAndPort getLocation() {
return location;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index e74c9ec..5182677 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -48,12 +48,12 @@ public interface TabletStateStore extends Iterable<TabletLocationState> {
/**
* Store the assigned locations in the data store.
*/
- void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+ void setFutureLocation(Assignment assignment);
/**
* Tablet servers will update the data store with the location when they bring the tablet online
*/
- void setLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+ void setLocation(Assignment assignment, TServerInstance prevLastLoc);
/**
* Mark the tablets as having no known or future location.
@@ -64,38 +64,35 @@ public interface TabletStateStore extends Iterable<TabletLocationState> {
* a cache of logs in use by servers when they died
*/
void unassign(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException;
+ Map<TServerInstance,List<Path>> logsForDeadServers);
/**
* Mark tablets as having no known or future location, but desiring to be returned to their
* previous tserver.
*/
void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
- throws DistributedStoreException;
+ Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp);
/**
* Remove a suspension marker for a collection of tablets, moving them to being simply unassigned.
*/
- void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+ void unsuspend(Collection<TabletLocationState> tablets);
public static void unassign(ServerContext context, TabletLocationState tls,
- Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers) {
getStoreForTablet(tls.extent, context).unassign(Collections.singletonList(tls),
logsForDeadServers);
}
public static void suspend(ServerContext context, TabletLocationState tls,
- Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
- throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) {
getStoreForTablet(tls.extent, context).suspend(Collections.singletonList(tls),
logsForDeadServers, suspensionTimestamp);
}
- public static void setLocation(ServerContext context, Assignment assignment)
- throws DistributedStoreException {
- getStoreForTablet(assignment.tablet, context)
- .setLocations(Collections.singletonList(assignment));
+ public static void setLocation(ServerContext context, Assignment assignment,
+ TServerInstance prevLastLoc) {
+ getStoreForTablet(assignment.tablet, context).setLocation(assignment, prevLastLoc);
}
static TabletStateStore getStoreForTablet(KeyExtent extent, ServerContext context) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index e34911a..bd5cdf0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -103,13 +103,7 @@ class ZooTabletStateStore implements TabletStateStore {
}
@Override
- public void setFutureLocations(Collection<Assignment> assignments)
- throws DistributedStoreException {
- if (assignments.size() != 1)
- throw new IllegalArgumentException("There is only one root tablet");
- Assignment assignment = assignments.iterator().next();
- if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
- throw new IllegalArgumentException("You can only store the root tablet location");
+ public void setFutureLocation(Assignment assignment) {
TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet);
tabletMutator.putLocation(assignment.server, LocationType.FUTURE);
@@ -117,23 +111,22 @@ class ZooTabletStateStore implements TabletStateStore {
}
@Override
- public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
- if (assignments.size() != 1)
- throw new IllegalArgumentException("There is only one root tablet");
- Assignment assignment = assignments.iterator().next();
- if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
- throw new IllegalArgumentException("You can only store the root tablet location");
-
+ public void setLocation(Assignment assignment, TServerInstance prevLastLoc) {
TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet);
tabletMutator.putLocation(assignment.server, LocationType.CURRENT);
+ tabletMutator.putLocation(assignment.server, LocationType.LAST);
tabletMutator.deleteLocation(assignment.server, LocationType.FUTURE);
+ if (prevLastLoc != null && !prevLastLoc.equals(assignment.server)) {
+ tabletMutator.deleteLocation(prevLastLoc, LocationType.LAST);
+ }
+
tabletMutator.mutate();
}
@Override
public void unassign(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers) {
if (tablets.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
TabletLocationState tls = tablets.iterator().next();
@@ -162,8 +155,7 @@ class ZooTabletStateStore implements TabletStateStore {
@Override
public void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
- throws DistributedStoreException {
+ Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) {
// No support for suspending root tablet.
unassign(tablets, logsForDeadServers);
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
index 6dd9a75..bce340d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
@@ -198,6 +198,23 @@ public abstract class TabletMutatorBase implements Ample.TabletMutator {
return this;
}
+ @Override
+ public Ample.TabletMutator putSuspension(Ample.TServer tServer, long suspensionTime) {
+ Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
+ mutation.put(TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(),
+ TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier(),
+ new Value(tServer + "|" + suspensionTime));
+ return this;
+ }
+
+ @Override
+ public Ample.TabletMutator deleteSuspension() {
+ Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
+ mutation.putDelete(TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(),
+ TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier());
+ return this;
+ }
+
protected Mutation getMutation() {
updatesEnabled = false;
return mutation;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 366e941..f60aa36 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -181,8 +181,7 @@ public class MasterMetadataUtil {
public static void replaceDatafiles(ServerContext context, KeyExtent extent,
Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles, TabletFile path,
- Long compactionId, DataFileValue size, String address, TServerInstance lastLocation,
- ZooLock zooLock) {
+ Long compactionId, DataFileValue size, ZooLock zooLock) {
context.getAmple().putGcCandidates(extent.getTableId(), datafilesToDelete);
@@ -197,13 +196,6 @@ public class MasterMetadataUtil {
if (compactionId != null)
tablet.putCompactionId(compactionId);
- TServerInstance self = getTServerInstance(address, zooLock);
- tablet.putLocation(self, LocationType.LAST);
-
- // remove the old location
- if (lastLocation != null && !lastLocation.equals(self))
- tablet.deleteLocation(lastLocation, LocationType.LAST);
-
tablet.putZooLock(zooLock);
tablet.mutate();
@@ -218,8 +210,8 @@ public class MasterMetadataUtil {
*/
public static StoredTabletFile updateTabletDataFile(ServerContext context, KeyExtent extent,
TabletFile path, StoredTabletFile mergeFile, DataFileValue dfv, MetadataTime time,
- Set<StoredTabletFile> filesInUseByScans, String address, ZooLock zooLock,
- Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
+ Set<StoredTabletFile> filesInUseByScans, ZooLock zooLock,
+ Set<String> unusedWalLogs, long flushId) {
TabletMutator tablet = context.getAmple().mutateTablet(extent);
StoredTabletFile newFile = null;
@@ -228,14 +220,6 @@ public class MasterMetadataUtil {
tablet.putFile(path, dfv);
tablet.putTime(time);
newFile = path.insert();
-
- TServerInstance self = getTServerInstance(address, zooLock);
- tablet.putLocation(self, LocationType.LAST);
-
- // remove the old location
- if (lastLocation != null && !lastLocation.equals(self)) {
- tablet.deleteLocation(lastLocation, LocationType.LAST);
- }
}
tablet.putFlushId(flushId);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java
index 88959f0..26a8876 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.util.Collections;
-import java.util.List;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
@@ -75,14 +74,14 @@ public class RootTabletStateStoreTest {
}
@Test
- public void testRootTabletStateStore() throws DistributedStoreException {
+ public void testRootTabletStateStore() {
ZooTabletStateStore tstore = new ZooTabletStateStore(new TestAmple());
KeyExtent root = RootTable.EXTENT;
String sessionId = "this is my unique session data";
TServerInstance server =
new TServerInstance(HostAndPort.fromParts("127.0.0.1", 10000), sessionId);
- List<Assignment> assignments = Collections.singletonList(new Assignment(root, server));
- tstore.setFutureLocations(assignments);
+ Assignment assignment = new Assignment(root, server);
+ tstore.setFutureLocation(assignment);
int count = 0;
for (TabletLocationState location : tstore) {
assertEquals(location.extent, root);
@@ -91,7 +90,7 @@ public class RootTabletStateStoreTest {
count++;
}
assertEquals(count, 1);
- tstore.setLocations(assignments);
+ tstore.setLocation(assignment, server);
count = 0;
for (TabletLocationState location : tstore) {
assertEquals(location.extent, root);
@@ -118,12 +117,12 @@ public class RootTabletStateStoreTest {
KeyExtent notRoot = new KeyExtent(TableId.of("0"), null, null);
try {
- tstore.setLocations(Collections.singletonList(new Assignment(notRoot, server)));
+ tstore.setLocation(new Assignment(notRoot, server), assigned.last);
fail("should not get here");
} catch (IllegalArgumentException ex) {}
try {
- tstore.setFutureLocations(Collections.singletonList(new Assignment(notRoot, server)));
+ tstore.setFutureLocation(new Assignment(notRoot, server));
fail("should not get here");
} catch (IllegalArgumentException ex) {}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 5f29286..49115bf 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -80,7 +80,6 @@ import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.ClosableIterator;
-import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.MergeInfo;
import org.apache.accumulo.server.master.state.MergeState;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -819,7 +818,7 @@ abstract class TabletGroupWatcher extends Daemon {
List<TabletLocationState> assignedToDeadServers,
Map<TServerInstance,List<Path>> logsForDeadServers,
List<TabletLocationState> suspendedToGoneServers, Map<KeyExtent,TServerInstance> unassigned)
- throws DistributedStoreException, TException, WalMarkerException {
+ throws TException, WalMarkerException {
boolean tabletsSuspendable = canSuspendTablets();
if (!assignedToDeadServers.isEmpty()) {
int maxServersToShow = min(assignedToDeadServers.size(), 100);
@@ -872,7 +871,9 @@ abstract class TabletGroupWatcher extends Daemon {
if (assignments.size() > 0) {
Master.log.info(String.format("Assigning %d tablets", assignments.size()));
- store.setFutureLocations(assignments);
+
+ for (Assignment assignment : assignments)
+ store.setFutureLocation(assignment);
}
assignments.addAll(assigned);
for (Assignment a : assignments) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index d1ef5ef..17a0519 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -192,7 +192,6 @@ import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.master.recovery.RecoveryPath;
import org.apache.accumulo.server.master.state.Assignment;
-import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
@@ -2364,8 +2363,6 @@ public class TabletServer extends AbstractServer {
TabletStateStore.suspend(getContext(), tls, null,
requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
}
- } catch (DistributedStoreException ex) {
- log.warn("Unable to update storage", ex);
} catch (KeeperException e) {
log.warn("Unable determine our zookeeper session information", e);
} catch (InterruptedException e) {
@@ -2508,7 +2505,7 @@ public class TabletServer extends AbstractServer {
throw new RuntimeException("Minor compaction after recovery fails for " + extent);
}
Assignment assignment = new Assignment(extent, getTabletSession());
- TabletStateStore.setLocation(getContext(), assignment);
+ TabletStateStore.setLocation(getContext(), assignment, data.getLastLocation());
synchronized (openingTablets) {
synchronized (onlineTablets) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 776b99a..4c0e9ed 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -548,9 +548,7 @@ class DatafileManager {
if (filesInUseByScans.size() > 0)
log.debug("Adding scan refs to metadata {} {}", extent, filesInUseByScans);
MasterMetadataUtil.replaceDatafiles(tablet.getContext(), extent, oldDatafiles,
- filesInUseByScans, newFile, compactionId, dfv,
- tablet.getTabletServer().getClientAddressString(), lastLocation,
- tablet.getTabletServer().getLock());
+ filesInUseByScans, newFile, compactionId, dfv, tablet.getTabletServer().getLock());
removeFilesAfterScan(filesInUseByScans);
if (log.isTraceEnabled()) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 50dd5d5..9d237ac 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -2684,10 +2684,9 @@ public class Tablet {
persistedTime = maxCommittedTime;
}
- return MasterMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent,
- newDatafile, absMergeFile, dfv, tabletTime.getMetadataTime(persistedTime),
- filesInUseByScans, tabletServer.getClientAddressString(), tabletServer.getLock(),
- unusedWalLogs, lastLocation, flushId);
+ return MasterMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent, newDatafile,
+ absMergeFile, dfv, tabletTime.getMetadataTime(persistedTime), filesInUseByScans,
+ tabletServer.getLock(), unusedWalLogs, flushId);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
index b0537a1..26b7a97 100644
--- a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@ -28,20 +28,20 @@ import java.util.TreeSet;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.master.state.ClosableIterator;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -74,6 +74,7 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase {
// make some tablets, spread 'em around
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
ClientContext context = (ClientContext) c;
+ ServerContext serverContext = cluster.getServerContext();
String table = this.getUniqueNames(1)[0];
c.securityOperations().grantTablePermission("root", MetadataTable.NAME,
TablePermission.WRITE);
@@ -134,19 +135,16 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase {
}
assertNotEquals(null, moved);
// throw a mutation in as if we were the dying tablet
- BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation assignment = new Mutation(moved.extent.getMetadataEntry());
- moved.current.putLocation(assignment);
- bw.addMutation(assignment);
- bw.close();
+ TabletMutator tabletMutator = serverContext.getAmple().mutateTablet(moved.extent);
+ tabletMutator.putLocation(moved.current, LocationType.CURRENT);
+ tabletMutator.mutate();
// wait for the master to fix the problem
waitForCleanStore(store);
// now jam up the metadata table
- bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- assignment = new Mutation(new KeyExtent(MetadataTable.ID, null, null).getMetadataEntry());
- moved.current.putLocation(assignment);
- bw.addMutation(assignment);
- bw.close();
+ tabletMutator =
+ serverContext.getAmple().mutateTablet(new KeyExtent(MetadataTable.ID, null, null));
+ tabletMutator.putLocation(moved.current, LocationType.CURRENT);
+ tabletMutator.mutate();
waitForCleanStore(TabletStateStore.getStoreForLevel(DataLevel.METADATA, context));
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index c0760fa..6337ef9 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -37,10 +37,8 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.clientImpl.ScannerImpl;
-import org.apache.accumulo.core.clientImpl.Writer;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -48,11 +46,13 @@ import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.fate.zookeeper.ZooLock;
@@ -201,11 +201,11 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, context, zl);
TServerInstance instance = new TServerInstance(location, zl.getSessionId());
- Writer writer = MetadataTableUtil.getMetadataTable(context);
Assignment assignment = new Assignment(high, instance);
- Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
- assignment.server.putFutureLocation(m);
- writer.update(m);
+
+ TabletMutator tabletMutator = context.getAmple().mutateTablet(extent);
+ tabletMutator.putLocation(assignment.server, LocationType.FUTURE);
+ tabletMutator.mutate();
if (steps >= 1) {
Map<Long,List<TabletFile>> bulkFiles = getBulkFilesLoaded(context, extent);
diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
index e2b846f..3a081ab 100644
--- a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.HostAndPort;
@@ -180,9 +181,10 @@ public class MergeStateIT extends ConfigurableMacBase {
KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
m = tablet.getPrevRowUpdateMutation();
TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5"));
+ TabletMetadata tabletMetadata = context.getAmple().readTablet(tablet);
update(accumuloClient, m);
- metaDataStateStore
- .setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
+ TServerInstance tServerInstance = new TServerInstance(tabletMetadata.getLast());
+ metaDataStateStore.setLocation(new Assignment(tablet, state.someTServer), tServerInstance);
// onos... there's a new tablet online
stats = scan(state, metaDataStateStore);
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index 9ce7525..b2ec934 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -321,11 +321,12 @@ public class NullTserver {
TabletLocationState next = s.next();
assignments.add(new Assignment(next.extent, instance));
}
- }
- // point them to this server
- TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context);
- store.setLocations(assignments);
+ // point them to this server
+ TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context);
+ for (Assignment assignment : assignments)
+ store.setLocation(assignment, instance);
+ }
while (true) {
sleepUninterruptibly(10, TimeUnit.SECONDS);
}