You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/04/26 18:07:57 UTC
[accumulo] branch master updated: fixes #1128 cleaned up upgrade
code (#1131)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 235d443 fixes #1128 cleaned up upgrade code (#1131)
235d443 is described below
commit 235d4438da00319fe06bebed93bd0e46bb98e7f5
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Apr 26 14:07:52 2019 -0400
fixes #1128 cleaned up upgrade code (#1131)
Removed a lot of upgrade code for older versions that should never run.
Refactored most of the upgrade code out of the master. One goal of the
refactoring was to create a structure for upgrades that ensures the
upgrade code for each version is in a separate file. The existing code
for upgrading zookeeper was single method with stuff for lots of
different versions. This made understanding what upgrade steps went with
which versions very difficult.
---
.../apache/accumulo/server/ServerConstants.java | 56 ++--
.../org/apache/accumulo/server/ServerUtil.java | 20 +-
.../accumulo/server/util/MetadataTableUtil.java | 87 ------
.../org/apache/accumulo/server/ServerUtilTest.java | 28 ++
.../java/org/apache/accumulo/master/Master.java | 324 +--------------------
.../master/upgrade/UpgradeCoordinator.java | 114 ++++++++
.../apache/accumulo/master/upgrade/Upgrader.java | 35 +++
.../accumulo/master/upgrade/Upgrader8to9.java | 37 +++
8 files changed, 262 insertions(+), 439 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
index 0be499b..09d5c92 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
@@ -19,10 +19,10 @@ package org.apache.accumulo.server;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -34,6 +34,9 @@ import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
public class ServerConstants {
public static final String VERSION_DIR = "version";
@@ -48,40 +51,33 @@ public class ServerConstants {
public static final Integer WIRE_VERSION = 3;
/**
- * version (8) reflects changes to RFile index (ACCUMULO-1124) in version 1.8.0
- */
- public static final int SHORTEN_RFILE_KEYS = 8;
- /**
- * version (7) also reflects the addition of a replication table
- */
- public static final int MOVE_TO_REPLICATION_TABLE = 7;
- /**
- * this is the current data version
- */
- public static final int DATA_VERSION = SHORTEN_RFILE_KEYS;
- /**
- * version (6) reflects the addition of a separate root table (ACCUMULO-1481) in version 1.6.0
+ * version (9) reflects changes to crypto that resulted in RFiles and WALs being serialized
+ * differently in version 2.0.0. Also RFiles in 2.0.0 may have summary data.
*/
- public static final int MOVE_TO_ROOT_TABLE = 6;
+ public static final int CRYPTO_CHANGES = 9;
/**
- * version (5) moves delete file markers for the metadata table into the root tablet
+ * version (8) reflects changes to RFile index (ACCUMULO-1124) AND the change to WAL tracking in
+ * ZK in version 1.8.0
*/
- public static final int MOVE_DELETE_MARKERS = 5;
+ public static final int SHORTEN_RFILE_KEYS = 8;
+
/**
- * version (4) moves logging to HDFS in 1.5.0
+ * Historic data versions
+ *
+ * <ul>
+ * <li>version (7) also reflects the addition of a replication table in 1.7.0
+ * <li>version (6) reflects the addition of a separate root table (ACCUMULO-1481) in 1.6.0 -
+ * <li>version (5) moves delete file markers for the metadata table into the root tablet
+ * <li>version (4) moves logging to HDFS in 1.5.0
+ * </ul>
+ *
+ *
*/
- public static final int LOGGING_TO_HDFS = 4;
- public static final BitSet CAN_UPGRADE = new BitSet();
- static {
- for (int i : new int[] {DATA_VERSION, MOVE_TO_REPLICATION_TABLE, MOVE_TO_ROOT_TABLE}) {
- CAN_UPGRADE.set(i);
- }
- }
- public static final BitSet NEEDS_UPGRADE = new BitSet();
- static {
- NEEDS_UPGRADE.xor(CAN_UPGRADE);
- NEEDS_UPGRADE.clear(DATA_VERSION);
- }
+ public static final int DATA_VERSION = CRYPTO_CHANGES;
+
+ public static final Set<Integer> CAN_RUN = ImmutableSet.of(SHORTEN_RFILE_KEYS, DATA_VERSION);
+ public static final Set<Integer> NEEDS_UPGRADE =
+ Sets.difference(CAN_RUN, ImmutableSet.of(DATA_VERSION));
private static String[] baseUris = null;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java b/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
index a2a5bb9..d20fa98 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
@@ -113,10 +113,7 @@ public class ServerUtil {
log.info("Data Version {}", dataVersion);
ServerUtil.waitForZookeeperAndHdfs(context);
- if (!(canUpgradeFromDataVersion(dataVersion))) {
- throw new RuntimeException("This version of accumulo (" + Constants.VERSION
- + ") is not compatible with files stored using data version " + dataVersion);
- }
+ ensureDataVersionCompatible(dataVersion);
TreeMap<String,String> sortedProps = new TreeMap<>();
for (Entry<String,String> entry : conf)
@@ -152,14 +149,13 @@ public class ServerUtil {
}
/**
- * Sanity check that the current persistent version is allowed to upgrade to the version of
- * Accumulo running.
- *
- * @param dataVersion
- * the version that is persisted in the backing Volumes
+ * Check to see if this version of Accumulo can run against or upgrade the passed in data version.
*/
- public static boolean canUpgradeFromDataVersion(final int dataVersion) {
- return ServerConstants.CAN_UPGRADE.get(dataVersion);
+ public static void ensureDataVersionCompatible(int dataVersion) {
+ if (!(ServerConstants.CAN_RUN.contains(dataVersion))) {
+ throw new IllegalStateException("This version of accumulo (" + Constants.VERSION
+ + ") is not compatible with files stored using data version " + dataVersion);
+ }
}
/**
@@ -167,7 +163,7 @@ public class ServerUtil {
* something?
*/
public static boolean persistentVersionNeedsUpgrade(final int accumuloPersistentVersion) {
- return ServerConstants.NEEDS_UPGRADE.get(accumuloPersistentVersion);
+ return ServerConstants.NEEDS_UPGRADE.contains(accumuloPersistentVersion);
}
public static void monitorSwappiness(AccumuloConfiguration config) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index f4112be..73e3ecc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -17,9 +17,6 @@
package org.apache.accumulo.server.util;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
@@ -72,7 +69,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Se
import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
-import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
@@ -89,7 +85,6 @@ import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -106,7 +101,6 @@ import com.google.common.collect.Iterables;
public class MetadataTableUtil {
private static final Text EMPTY_TEXT = new Text();
- private static final byte[] EMPTY_BYTES = new byte[0];
private static Map<Credentials,Writer> root_tables = new HashMap<>();
private static Map<Credentials,Writer> metadata_tables = new HashMap<>();
private static final Logger log = LoggerFactory.getLogger(MetadataTableUtil.class);
@@ -1020,87 +1014,6 @@ public class MetadataTableUtil {
update(context, m, new KeyExtent(TableId.of("anythingNotMetadata"), null, null));
}
- /**
- * During an upgrade from 1.6 to 1.7, we need to add the replication table
- */
- public static void createReplicationTable(ServerContext context) {
-
- VolumeChooserEnvironment chooserEnv =
- new VolumeChooserEnvironmentImpl(ReplicationTable.ID, null, context);
- String dir = context.getVolumeManager().choose(chooserEnv, ServerConstants.getBaseUris(context))
- + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + ReplicationTable.ID
- + Constants.DEFAULT_TABLET_LOCATION;
-
- Mutation m = new Mutation(new Text(TabletsSection.getRow(ReplicationTable.ID, null)));
- m.put(DIRECTORY_COLUMN.getColumnFamily(), DIRECTORY_COLUMN.getColumnQualifier(), 0,
- new Value(dir.getBytes(UTF_8)));
- m.put(TIME_COLUMN.getColumnFamily(), TIME_COLUMN.getColumnQualifier(), 0,
- new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8)));
- m.put(PREV_ROW_COLUMN.getColumnFamily(), PREV_ROW_COLUMN.getColumnQualifier(), 0,
- KeyExtent.encodePrevEndRow(null));
- update(context, getMetadataTable(context), null, m);
- }
-
- /**
- * During an upgrade we need to move deletion requests for files under the !METADATA table to the
- * root tablet.
- */
- public static void moveMetaDeleteMarkers(ServerContext context) {
- String oldDeletesPrefix = "!!~del";
- Range oldDeletesRange = new Range(oldDeletesPrefix, true, "!!~dem", false);
-
- // move old delete markers to new location, to standardize table schema between all metadata
- // tables
- try (Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY)) {
- scanner.setRange(oldDeletesRange);
- for (Entry<Key,Value> entry : scanner) {
- String row = entry.getKey().getRow().toString();
- if (row.startsWith(oldDeletesPrefix)) {
- moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix);
- } else {
- break;
- }
- }
- }
- }
-
- public static void moveMetaDeleteMarkersFrom14(ServerContext context) {
- // new KeyExtent is only added to force update to write to the metadata table, not the root
- // table
- KeyExtent notMetadata = new KeyExtent(TableId.of("anythingNotMetadata"), null, null);
-
- // move delete markers from the normal delete keyspace to the root tablet delete keyspace if the
- // files are for the !METADATA table
- try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
- scanner.setRange(MetadataSchema.DeletesSection.getRange());
- for (Entry<Key,Value> entry : scanner) {
- String row = entry.getKey().getRow().toString();
- if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) {
- moveDeleteEntry(context, notMetadata, entry, row,
- MetadataSchema.DeletesSection.getRowPrefix());
- } else {
- break;
- }
- }
- }
- }
-
- private static void moveDeleteEntry(ServerContext context, KeyExtent oldExtent,
- Entry<Key,Value> entry, String rowID, String prefix) {
- String filename = rowID.substring(prefix.length());
-
- // add the new entry first
- log.info("Moving {} marker in {}", filename, RootTable.NAME);
- Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + filename);
- m.put(EMPTY_BYTES, EMPTY_BYTES, EMPTY_BYTES);
- update(context, m, RootTable.EXTENT);
-
- // then remove the old entry
- m = new Mutation(entry.getKey().getRow());
- m.putDelete(EMPTY_BYTES, EMPTY_BYTES);
- update(context, m, oldExtent);
- }
-
public static SortedMap<Text,SortedMap<ColumnFQ,Value>>
getTabletEntries(SortedMap<Key,Value> tabletKeyValues, List<ColumnFQ> columns) {
TreeMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries = new TreeMap<>();
diff --git a/server/base/src/test/java/org/apache/accumulo/server/ServerUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/ServerUtilTest.java
new file mode 100644
index 0000000..25466c1
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/ServerUtilTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.server;
+
+import org.junit.Test;
+
+public class ServerUtilTest {
+ @Test(expected = IllegalStateException.class)
+ public void testCanRun() {
+ // ensure this fails with older versions
+ ServerUtil.ensureDataVersionCompatible(7);
+ }
+}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 3577530..30c284a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -34,9 +34,10 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,8 +47,6 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.clientImpl.Namespace;
-import org.apache.accumulo.core.clientImpl.Namespaces;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -56,7 +55,6 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -72,18 +70,13 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.NamespacePermission;
-import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.fate.AgeOffStore;
import org.apache.accumulo.fate.Fate;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
@@ -96,18 +89,13 @@ import org.apache.accumulo.master.replication.MasterReplicationCoordinator;
import org.apache.accumulo.master.replication.ReplicationDriver;
import org.apache.accumulo.master.replication.WorkDriver;
import org.apache.accumulo.master.state.TableCounts;
+import org.apache.accumulo.master.upgrade.UpgradeCoordinator;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.HighlyAvailableService;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
-import org.apache.accumulo.server.ServerUtil;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
-import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.master.LiveTServerSet;
@@ -138,12 +126,10 @@ import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.delegation.AuthenticationTokenKeyManager;
import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyDistributor;
-import org.apache.accumulo.server.security.handler.ZKPermHandler;
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.tables.TableObserver;
import org.apache.accumulo.server.util.DefaultMap;
import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.util.ServerBulkImportStatus;
import org.apache.accumulo.server.util.TableInfoUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
@@ -167,7 +153,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -270,309 +255,26 @@ public class Master extends AbstractServer
}
if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
- upgradeZookeeper();
+ upgradeCoordinator.upgradeZookeeper();
}
if (oldState != newState && (newState == MasterState.NORMAL)) {
- upgradeMetadata();
- }
- }
-
- private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception {
- ServerContext context = getContext();
- String dirZPath = getZooKeeperRoot() + RootTable.ZROOT_TABLET_PATH;
-
- if (!zoo.exists(dirZPath)) {
- Path oldPath = fs.getFullPath(FileType.TABLE, "/" + MetadataTable.ID + "/root_tablet");
- if (fs.exists(oldPath)) {
- VolumeChooserEnvironment chooserEnv =
- new VolumeChooserEnvironmentImpl(RootTable.ID, RootTable.EXTENT.getEndRow(), context);
- String newPath = fs.choose(chooserEnv, ServerConstants.getBaseUris(context))
- + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + RootTable.ID;
- fs.mkdirs(new Path(newPath));
- if (!fs.rename(oldPath, new Path(newPath))) {
- throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath);
- }
-
- log.info("Upgrade renamed {} to {}", oldPath, newPath);
- }
-
- Path location = null;
-
- for (String basePath : ServerConstants.getTablesDirs(context)) {
- Path path = new Path(basePath + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
- if (fs.exists(path)) {
- if (location != null) {
- throw new IllegalStateException(
- "Root table at multiple locations " + location + " " + path);
- }
-
- location = path;
- }
- }
-
- if (location == null) {
- throw new IllegalStateException("Failed to find root tablet");
- }
-
- log.info("Upgrade setting root table location in zookeeper {}", location);
- zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL);
- }
- }
-
- private boolean haveUpgradedZooKeeper = false;
-
- @SuppressFBWarnings(value = "DM_EXIT",
- justification = "TODO probably not the best to call System.exit here")
- private void upgradeZookeeper() {
- ServerContext context = getContext();
-
- // 1.5.1 and 1.6.0 both do some state checking after obtaining the zoolock for the
- // monitor and before starting up. It's not tied to the data version at all (and would
- // introduce unnecessary complexity to try to make the master do it), but be aware
- // that the master is not the only thing that may alter zookeeper before starting.
-
- final int accumuloPersistentVersion = ServerUtil.getAccumuloPersistentVersion(fs);
- if (ServerUtil.persistentVersionNeedsUpgrade(accumuloPersistentVersion)) {
- // This Master hasn't started Fate yet, so any outstanding transactions must be from before
- // the upgrade.
- // Change to Guava's Verify once we use Guava 17.
if (fate != null) {
throw new IllegalStateException("Access to Fate should not have been"
- + " initialized prior to the Master transitioning to active. Please"
- + " save all logs and file a bug.");
- }
- ServerUtil.abortIfFateTransactions(context);
- try {
- log.info("Upgrading zookeeper");
-
- IZooReaderWriter zoo = context.getZooReaderWriter();
- final String zooRoot = getZooKeeperRoot();
-
- log.debug("Handling updates for version {}", accumuloPersistentVersion);
-
- log.debug("Cleaning out remnants of logger role.");
- zoo.recursiveDelete(zooRoot + "/loggers", NodeMissingPolicy.SKIP);
- zoo.recursiveDelete(zooRoot + "/dead/loggers", NodeMissingPolicy.SKIP);
-
- final byte[] zero = {'0'};
- log.debug("Initializing recovery area.");
- zoo.putPersistentData(zooRoot + Constants.ZRECOVERY, zero, NodeExistsPolicy.SKIP);
-
- for (String id : zoo.getChildren(zooRoot + Constants.ZTABLES)) {
- log.debug("Prepping table {} for compaction cancellations.", id);
- zoo.putPersistentData(
- zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_COMPACT_CANCEL_ID, zero,
- NodeExistsPolicy.SKIP);
- }
-
- @SuppressWarnings("deprecation")
- String zpath = zooRoot + Constants.ZCONFIG + "/" + Property.TSERV_WAL_SYNC_METHOD.getKey();
- // is the entire instance set to use flushing vs sync?
- boolean flushDefault = false;
- try {
- byte[] data = zoo.getData(zpath, null);
- if (new String(data, UTF_8).endsWith("flush")) {
- flushDefault = true;
- }
- } catch (KeeperException.NoNodeException ex) {
- // skip
- }
- for (String id : zoo.getChildren(zooRoot + Constants.ZTABLES)) {
- log.debug("Converting table {} WALog setting to Durability", id);
- try {
- @SuppressWarnings("deprecation")
- String path = zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_CONF + "/"
- + Property.TABLE_WALOG_ENABLED.getKey();
- byte[] data = zoo.getData(path, null);
- boolean useWAL = Boolean.parseBoolean(new String(data, UTF_8));
- zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
- path = zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_CONF + "/"
- + Property.TABLE_DURABILITY.getKey();
- if (useWAL) {
- if (flushDefault) {
- zoo.putPersistentData(path, "flush".getBytes(), NodeExistsPolicy.SKIP);
- } else {
- zoo.putPersistentData(path, "sync".getBytes(), NodeExistsPolicy.SKIP);
- }
- } else {
- zoo.putPersistentData(path, "none".getBytes(), NodeExistsPolicy.SKIP);
- }
- } catch (KeeperException.NoNodeException ex) {
- // skip it
- }
- }
-
- // create initial namespaces
- String namespaces = getZooKeeperRoot() + Constants.ZNAMESPACES;
- zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP);
- for (Pair<String,NamespaceId> namespace : Iterables.concat(
- Collections.singleton(new Pair<>(Namespace.ACCUMULO.name(), Namespace.ACCUMULO.id())),
- Collections.singleton(new Pair<>(Namespace.DEFAULT.name(), Namespace.DEFAULT.id())))) {
- String ns = namespace.getFirst();
- NamespaceId id = namespace.getSecond();
- log.debug("Upgrade creating namespace \"{}\" (ID: {})", ns, id);
- if (!Namespaces.exists(context, id)) {
- TableManager.prepareNewNamespaceState(zoo, getInstanceID(), id, ns,
- NodeExistsPolicy.SKIP);
- }
- }
-
- // create replication table in zk
- log.debug("Upgrade creating table {} (ID: {})", ReplicationTable.NAME, ReplicationTable.ID);
- TableManager.prepareNewTableState(zoo, getInstanceID(), ReplicationTable.ID,
- Namespace.ACCUMULO.id(), ReplicationTable.NAME, TableState.OFFLINE,
- NodeExistsPolicy.SKIP);
-
- // create root table
- log.debug("Upgrade creating table {} (ID: {})", RootTable.NAME, RootTable.ID);
- TableManager.prepareNewTableState(zoo, getInstanceID(), RootTable.ID,
- Namespace.ACCUMULO.id(), RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.SKIP);
- Initialize.initSystemTablesConfig(context.getZooReaderWriter(), context.getZooKeeperRoot(),
- context.getHadoopConf());
- // ensure root user can flush root table
- security.grantTablePermission(context.rpcCreds(), security.getRootUsername(), RootTable.ID,
- TablePermission.ALTER_TABLE, Namespace.ACCUMULO.id());
-
- // put existing tables in the correct namespaces
- String tables = getZooKeeperRoot() + Constants.ZTABLES;
- for (String tableId : zoo.getChildren(tables)) {
- NamespaceId targetNamespace = (MetadataTable.ID.canonical().equals(tableId)
- || RootTable.ID.canonical().equals(tableId)) ? Namespace.ACCUMULO.id()
- : Namespace.DEFAULT.id();
- log.debug("Upgrade moving table {} (ID: {}) into namespace with ID {}",
- new String(zoo.getData(tables + "/" + tableId + Constants.ZTABLE_NAME, null), UTF_8),
- tableId, targetNamespace);
- zoo.putPersistentData(tables + "/" + tableId + Constants.ZTABLE_NAMESPACE,
- targetNamespace.canonical().getBytes(UTF_8), NodeExistsPolicy.SKIP);
- }
-
- // rename metadata table
- log.debug("Upgrade renaming table {} (ID: {}) to {}", MetadataTable.OLD_NAME,
- MetadataTable.ID, MetadataTable.NAME);
- zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME,
- Tables.qualify(MetadataTable.NAME).getSecond().getBytes(UTF_8),
- NodeExistsPolicy.OVERWRITE);
-
- moveRootTabletToRootTable(zoo);
-
- // add system namespace permissions to existing users
- // N.B. this section is ignoring the configured PermissionHandler
- // under the assumption that these details are in zk and we can
- // modify the structure so long as we pass back in whatever we read.
- // This is true for any permission handler, including KerberosPermissionHandler,
- // that uses the ZKPermHandler for permissions storage so long
- // as the PermHandler only overrides the user name, and we don't care what the user name is.
- ZKPermHandler perm = new ZKPermHandler();
- perm.initialize(context, true);
- String users = getZooKeeperRoot() + "/users";
- for (String user : zoo.getChildren(users)) {
- zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0],
- NodeExistsPolicy.SKIP);
- perm.grantNamespacePermission(user, Namespace.ACCUMULO.id().canonical(),
- NamespacePermission.READ);
- }
- // because we need to refer to the root username, we can't use the
- // ZKPermHandler directly since that violates our earlier assumption that we don't
- // care about contents of the username. When using a PermissionHandler that needs to
- // encode the username in some way, i.e. the KerberosPermissionHandler, things would
- // fail. Instead we should be able to use the security object since
- // the loop above should have made the needed structure in ZK.
- security.grantNamespacePermission(context.rpcCreds(), security.getRootUsername(),
- Namespace.ACCUMULO.id(), NamespacePermission.ALTER_TABLE);
-
- // add the currlog location for root tablet current logs
- zoo.putPersistentData(getZooKeeperRoot() + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0],
- NodeExistsPolicy.SKIP);
-
- // create tablet server wal logs node in ZK
- zoo.putPersistentData(getZooKeeperRoot() + WalStateManager.ZWALS, new byte[0],
- NodeExistsPolicy.SKIP);
-
- haveUpgradedZooKeeper = true;
- } catch (Exception ex) {
- // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
- log.error("FATAL: Error performing upgrade", ex);
- System.exit(1);
+ + " initialized prior to the Master finishing upgrades. Please save"
+ + " all logs and file a bug.");
}
+ upgradeMetadataFuture = upgradeCoordinator.upgradeMetadata();
}
}
- private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
- private final CountDownLatch waitForMetadataUpgrade = new CountDownLatch(1);
+ private UpgradeCoordinator upgradeCoordinator;
+ private Future<Void> upgradeMetadataFuture;
private final ServerConfigurationFactory serverConfig;
private MasterClientServiceHandler clientHandler;
- private void upgradeMetadata() {
- // we make sure we're only doing the rest of this method once so that we can signal to other
- // threads that an upgrade wasn't needed.
- if (upgradeMetadataRunning.compareAndSet(false, true)) {
- final int accumuloPersistentVersion = ServerUtil.getAccumuloPersistentVersion(fs);
- if (ServerUtil.persistentVersionNeedsUpgrade(accumuloPersistentVersion)) {
- // sanity check that we passed the Fate verification prior to ZooKeeper upgrade, and that
- // Fate still hasn't been started.
- // Change both to use Guava's Verify once we use Guava 17.
- if (!haveUpgradedZooKeeper) {
- throw new IllegalStateException("We should only attempt to upgrade"
- + " Accumulo's metadata table if we've already upgraded ZooKeeper."
- + " Please save all logs and file a bug.");
- }
- if (fate != null) {
- throw new IllegalStateException("Access to Fate should not have been"
- + " initialized prior to the Master finishing upgrades. Please save"
- + " all logs and file a bug.");
- }
- Runnable upgradeTask = new Runnable() {
- int version = accumuloPersistentVersion;
-
- @SuppressFBWarnings(value = "DM_EXIT",
- justification = "TODO probably not the best to call System.exit here")
- @Override
- public void run() {
- ServerContext context = getContext();
- try {
- log.info("Starting to upgrade metadata table.");
- if (version == ServerConstants.MOVE_DELETE_MARKERS - 1) {
- log.info("Updating Delete Markers in metadata table for version 1.4");
- MetadataTableUtil.moveMetaDeleteMarkersFrom14(context);
- version++;
- }
- if (version == ServerConstants.MOVE_TO_ROOT_TABLE - 1) {
- log.info("Updating Delete Markers in metadata table.");
- MetadataTableUtil.moveMetaDeleteMarkers(context);
- version++;
- }
- if (version == ServerConstants.MOVE_TO_REPLICATION_TABLE - 1) {
- log.info("Updating metadata table with entries for the replication table");
- MetadataTableUtil.createReplicationTable(context);
- version++;
- }
- log.info("Updating persistent data version.");
- ServerUtil.updateAccumuloVersion(fs, accumuloPersistentVersion);
- log.info("Upgrade complete");
- waitForMetadataUpgrade.countDown();
- } catch (Exception ex) {
- // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j
- // compatibility
- log.error("FATAL: Error performing upgrade", ex);
- System.exit(1);
- }
-
- }
- };
-
- // need to run this in a separate thread because a lock is held that prevents metadata
- // tablets from being assigned and this task writes to the
- // metadata table
- new Thread(upgradeTask).start();
- } else {
- waitForMetadataUpgrade.countDown();
- }
- }
- }
-
private int assignedOrHosted(TableId tableId) {
int result = 0;
for (TabletGroupWatcher watcher : watchers) {
@@ -716,6 +418,7 @@ public class Master extends AbstractServer
delegationTokensAvailable = false;
}
+ upgradeCoordinator = new UpgradeCoordinator(context);
}
public String getInstanceID() {
@@ -1363,9 +1066,10 @@ public class Master extends AbstractServer
// Once we are sure the upgrade is complete, we can safely allow fate use.
try {
- waitForMetadataUpgrade.await();
- } catch (InterruptedException e) {
- throw new IllegalStateException("Metadata upgrade interrupted", e);
+ // wait for metadata upgrade running in background to complete
+ upgradeMetadataFuture.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IllegalStateException("Metadata upgrade failed", e);
}
try {
diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
new file mode 100644
index 0000000..4e901b7
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.master.upgrade;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class UpgradeCoordinator {
+
+ private static Logger log = LoggerFactory.getLogger(UpgradeCoordinator.class);
+
+ private ServerContext context;
+ private boolean haveUpgradedZooKeeper = false;
+ private boolean startedMetadataUpgrade = false;
+ private int currentVersion;
+ private Map<Integer,Upgrader> upgraders =
+ ImmutableMap.of(ServerConstants.SHORTEN_RFILE_KEYS, new Upgrader8to9());
+
+ public UpgradeCoordinator(ServerContext ctx) {
+ int currentVersion = ServerUtil.getAccumuloPersistentVersion(ctx.getVolumeManager());
+
+ ServerUtil.ensureDataVersionCompatible(currentVersion);
+
+ this.currentVersion = currentVersion;
+ this.context = ctx;
+ }
+
+ @SuppressFBWarnings(value = "DM_EXIT",
+ justification = "Want to immediately stop all master threads on upgrade error")
+ private void handleFailure(Exception e) {
+ log.error("FATAL: Error performing upgrade", e);
+ System.exit(1);
+ }
+
+ public synchronized void upgradeZookeeper() {
+ if (haveUpgradedZooKeeper)
+ throw new IllegalStateException("Only expect this method to be called once");
+
+ try {
+ if (currentVersion < ServerConstants.DATA_VERSION) {
+ ServerUtil.abortIfFateTransactions(context);
+
+ for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) {
+ log.info("Upgrading Zookeeper from data version {}", v);
+ upgraders.get(v).upgradeZookeeper(context);
+ }
+ }
+
+ haveUpgradedZooKeeper = true;
+ } catch (Exception e) {
+ handleFailure(e);
+ }
+ }
+
+ public synchronized Future<Void> upgradeMetadata() {
+ if (startedMetadataUpgrade)
+ throw new IllegalStateException("Only expect this method to be called once");
+
+ if (!haveUpgradedZooKeeper) {
+ throw new IllegalStateException("We should only attempt to upgrade"
+ + " Accumulo's metadata table if we've already upgraded ZooKeeper."
+ + " Please save all logs and file a bug.");
+ }
+
+ startedMetadataUpgrade = true;
+
+ if (currentVersion < ServerConstants.DATA_VERSION) {
+ return Executors.newCachedThreadPool().submit(() -> {
+ try {
+ for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) {
+ log.info("Upgrading Metadata from data version {}", v);
+ upgraders.get(v).upgradeMetadata(context);
+ }
+
+ log.info("Updating persistent data version.");
+ ServerUtil.updateAccumuloVersion(context.getVolumeManager(), currentVersion);
+ log.info("Upgrade complete");
+ } catch (Exception e) {
+ handleFailure(e);
+ }
+ return null;
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader.java
new file mode 100644
index 0000000..fd68037
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.master.upgrade;
+
+import org.apache.accumulo.server.ServerContext;
+
+/**
+ * The purpose of this interface is to allow per version upgrade implementations to be created.
+ * Keeping the code for upgrading each version separate makes it easier to maintain and understand
+ * the upgrade code over time.
+ *
+ * <p>
+ * Upgrade operations should be idempotent. For failure cases upgrade operations may partially
+ * complete and then be run again later.
+ */
+public interface Upgrader {
+ void upgradeZookeeper(ServerContext ctx);
+
+ void upgradeMetadata(ServerContext ctx);
+}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader8to9.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader8to9.java
new file mode 100644
index 0000000..c7106a8
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader8to9.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.master.upgrade;
+
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.ServerContext;
+
+/**
+ * See {@link ServerConstants#CRYPTO_CHANGES}
+ */
+public class Upgrader8to9 implements Upgrader {
+
+ @Override
+ public void upgradeZookeeper(ServerContext ctx) {
+ // There is no action that needs to be taken for zookeeper
+ }
+
+ @Override
+ public void upgradeMetadata(ServerContext ctx) {
+ // There is no action that needs to be taken for metadata
+ }
+}