You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:21:16 UTC
[30/34] accumulo git commit: ACCUMULO-3423 optimize WAL metadata
table updates
ACCUMULO-3423 optimize WAL metadata table updates
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3fdd29f5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3fdd29f5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3fdd29f5
Branch: refs/heads/master
Commit: 3fdd29f5222f9d1d32ca28b5ecf1d740a8d20f87
Parents: ea25e98
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Apr 24 18:15:05 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Apr 24 18:18:56 2015 -0400
----------------------------------------------------------------------
.../client/impl/ReplicationOperationsImpl.java | 4 +-
.../org/apache/accumulo/core/conf/Property.java | 4 +-
.../accumulo/core/metadata/RootTable.java | 1 +
.../core/metadata/schema/MetadataSchema.java | 48 ++
.../core/tabletserver/log/LogEntry.java | 78 ++-
.../core/metadata/MetadataTableSchemaTest.java | 47 ++
.../org/apache/accumulo/server/TabletLevel.java | 34 ++
.../apache/accumulo/server/fs/VolumeUtil.java | 22 +-
.../apache/accumulo/server/init/Initialize.java | 1 +
.../server/master/state/MetaDataStateStore.java | 47 +-
.../master/state/MetaDataTableScanner.java | 6 +-
.../master/state/TabletLocationState.java | 7 +
.../server/master/state/TabletStateStore.java | 16 +-
.../master/state/ZooTabletStateStore.java | 35 +-
.../accumulo/server/replication/StatusUtil.java | 13 +
.../accumulo/server/util/ListVolumesUsed.java | 18 +-
.../server/util/MasterMetadataUtil.java | 18 +-
.../accumulo/server/util/MetadataTableUtil.java | 239 +++++---
.../server/util/ReplicationTableUtil.java | 13 +-
.../server/util/ReplicationTableUtilTest.java | 2 +-
.../gc/GarbageCollectWriteAheadLogs.java | 499 +++++++---------
.../accumulo/gc/SimpleGarbageCollector.java | 1 -
.../CloseWriteAheadLogReferences.java | 23 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 567 -------------------
.../CloseWriteAheadLogReferencesTest.java | 151 +----
.../java/org/apache/accumulo/master/Master.java | 3 +
.../master/MasterClientServiceHandler.java | 3 +-
.../accumulo/master/TabletGroupWatcher.java | 37 +-
.../accumulo/master/replication/WorkMaker.java | 1 +
.../accumulo/master/state/MergeStats.java | 3 +-
.../master/ReplicationOperationsImplTest.java | 9 +-
.../apache/accumulo/master/TestMergeState.java | 2 +-
.../master/state/RootTabletStateStoreTest.java | 4 +-
.../src/main/findbugs/exclude-filter.xml | 2 +-
.../server/GarbageCollectionLogger.java | 3 +-
.../apache/accumulo/tserver/TabletServer.java | 182 +++---
.../apache/accumulo/tserver/log/DfsLogger.java | 14 +-
.../accumulo/tserver/log/SortedLogRecovery.java | 8 +-
.../tserver/log/TabletServerLogger.java | 187 +++---
.../accumulo/tserver/tablet/CommitSession.java | 3 +-
.../tserver/tablet/DatafileManager.java | 4 +-
.../apache/accumulo/tserver/tablet/Tablet.java | 59 +-
.../tserver/tablet/TabletCommitter.java | 3 +-
.../accumulo/tserver/log/LogEntryTest.java | 56 ++
.../test/performance/thrift/NullTserver.java | 6 +-
.../accumulo/proxy/ProxyDurabilityIT.java | 9 +-
.../test/BadDeleteMarkersCreatedIT.java | 2 +-
.../org/apache/accumulo/test/BalanceIT.java | 20 +-
.../org/apache/accumulo/test/CleanWalIT.java | 1 +
.../accumulo/test/ConditionalWriterIT.java | 1 +
.../accumulo/test/GarbageCollectWALIT.java | 81 +++
.../MissingWalHeaderCompletesRecoveryIT.java | 14 +-
.../accumulo/test/NoMutationRecoveryIT.java | 178 ------
.../org/apache/accumulo/test/ShellServerIT.java | 2 +-
.../org/apache/accumulo/test/UnusedWALIT.java | 144 +++++
.../java/org/apache/accumulo/test/VolumeIT.java | 17 +
.../accumulo/test/functional/ReadWriteIT.java | 8 +
.../accumulo/test/functional/WALSunnyDayIT.java | 250 ++++++++
.../test/functional/WatchTheWatchCountIT.java | 2 +-
.../test/performance/RollWALPerformanceIT.java | 126 +++++
...bageCollectorCommunicatesWithTServersIT.java | 35 +-
.../replication/MultiInstanceReplicationIT.java | 2 +-
.../test/replication/ReplicationIT.java | 370 ++++--------
63 files changed, 1857 insertions(+), 1888 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 6a5c74a..925877d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -153,9 +153,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
try {
for (Entry<Key,Value> entry : metaBs) {
LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String log : logEntry.logSet) {
- wals.add(new Path(log).toString());
- }
+ wals.add(new Path(logEntry.filename).toString());
}
} finally {
metaBs.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 429abad..a5bef0a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -345,8 +345,8 @@ public enum Property {
+ "no longer in use are removed from the filesystem."),
GC_PORT("gc.port.client", "50091", PropertyType.PORT, "The listening port for the garbage collector's monitor service"),
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
- GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
- GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
+ GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured."),
+ GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting."),
GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"),
// properties that are specific to the monitor server behavior
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
index 292ba3b..97d73d1 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
@@ -41,6 +41,7 @@ public class RootTable {
public static final String ZROOT_TABLET_FUTURE_LOCATION = ZROOT_TABLET + "/future_location";
public static final String ZROOT_TABLET_LAST_LOCATION = ZROOT_TABLET + "/lastlocation";
public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
+ public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + "/current_logs";
public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
public static final KeyExtent EXTENT = new KeyExtent(new Text(ID), null, null);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 6baae17..c787d6d 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -16,11 +16,14 @@
*/
package org.apache.accumulo.core.metadata.schema;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.schema.Section;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.hadoop.io.Text;
@@ -278,4 +281,49 @@ public class MetadataSchema {
buff.set(buff.getBytes(), section.getRowPrefix().length(), buff.getLength() - section.getRowPrefix().length());
}
}
+
+ /**
+ * Holds references to the WALs in use in a live Tablet Server.
+ * <p>
+ * <code>~wal+tserver:port[sessionId] log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL [] -></code>
+ */
+ public static class CurrentLogsSection {
+ private static final Section section = new Section(RESERVED_PREFIX + "wal+", true, RESERVED_PREFIX + "wal,", false);
+ private static byte LEFT_BRACKET = (byte)'[';
+ public static final Text COLF = new Text("log");
+ public static final Value UNUSED = new Value("unused".getBytes(UTF_8));
+
+ public static Range getRange() {
+ return section.getRange();
+ }
+
+ public static String getRowPrefix() {
+ return section.getRowPrefix();
+ }
+
+ public static void getTabletServer(Key k, Text hostPort, Text session) {
+ Preconditions.checkNotNull(k);
+ Preconditions.checkNotNull(hostPort);
+ Preconditions.checkNotNull(session);
+
+ Text row = new Text();
+ k.getRow(row);
+ if (!row.toString().startsWith(section.getRowPrefix())) {
+ throw new IllegalArgumentException("Bad key " + k.toString());
+ }
+ for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) {
+ if (row.charAt(sessionStart) == LEFT_BRACKET) {
+ hostPort.set(row.getBytes(), section.getRowPrefix().length(), sessionStart - section.getRowPrefix().length());
+ session.set(row.getBytes(), sessionStart + 1, row.getLength() - sessionStart - 2);
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Bad key " + k.toString());
+ }
+
+ public static void getPath(Key k, Text path) {
+ k.getColumnQualifier(path);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index 7fe61d1..ab70bb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -16,10 +16,10 @@
*/
package org.apache.accumulo.core.tabletserver.log;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -29,30 +29,29 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
-import com.google.common.base.Joiner;
-
public class LogEntry {
- public KeyExtent extent;
- public long timestamp;
- public String server;
- public String filename;
- public int tabletId;
- public Collection<String> logSet;
-
- public LogEntry() {}
+ public final KeyExtent extent;
+ public final long timestamp;
+ public final String server;
+ public final String filename;
public LogEntry(LogEntry le) {
this.extent = le.extent;
this.timestamp = le.timestamp;
this.server = le.server;
this.filename = le.filename;
- this.tabletId = le.tabletId;
- this.logSet = new ArrayList<String>(le.logSet);
+ }
+
+ public LogEntry(KeyExtent extent, long timestamp, String server, String filename) {
+ this.extent = extent;
+ this.timestamp = timestamp;
+ this.server = server;
+ this.filename = filename;
}
@Override
public String toString() {
- return extent.toString() + " " + filename + " (" + tabletId + ")";
+ return extent.toString() + " " + filename;
}
public String getName() {
@@ -65,43 +64,35 @@ public class LogEntry {
out.writeLong(timestamp);
out.writeUTF(server);
out.writeUTF(filename);
- out.write(tabletId);
- out.write(logSet.size());
- for (String s : logSet) {
- out.writeUTF(s);
- }
return Arrays.copyOf(out.getData(), out.getLength());
}
- public void fromBytes(byte bytes[]) throws IOException {
+ static public LogEntry fromBytes(byte bytes[]) throws IOException {
DataInputBuffer inp = new DataInputBuffer();
inp.reset(bytes, bytes.length);
- extent = new KeyExtent();
+ KeyExtent extent = new KeyExtent();
extent.readFields(inp);
- timestamp = inp.readLong();
- server = inp.readUTF();
- filename = inp.readUTF();
- tabletId = inp.read();
- int count = inp.read();
- ArrayList<String> logSet = new ArrayList<String>(count);
- for (int i = 0; i < count; i++)
- logSet.add(inp.readUTF());
- this.logSet = logSet;
+ long timestamp = inp.readLong();
+ String server = inp.readUTF();
+ String filename = inp.readUTF();
+ return new LogEntry(extent, timestamp, server, filename);
}
static private final Text EMPTY_TEXT = new Text();
public static LogEntry fromKeyValue(Key key, Value value) {
- LogEntry result = new LogEntry();
- result.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
+ String qualifier = key.getColumnQualifier().toString();
+ if (qualifier.indexOf('/') < 1) {
+ throw new IllegalArgumentException("Bad key for log entry: " + key);
+ }
+ KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
String[] parts = key.getColumnQualifier().toString().split("/", 2);
- result.server = parts[0];
- result.filename = parts[1];
- parts = value.toString().split("\\|");
- result.tabletId = Integer.parseInt(parts[1]);
- result.logSet = Arrays.asList(parts[0].split(";"));
- result.timestamp = key.getTimestamp();
- return result;
+ String server = parts[0];
+ // handle old-style log entries that specify log sets
+ parts = value.toString().split("\\|")[0].split(";");
+ String filename = parts[parts.length - 1];
+ long timestamp = key.getTimestamp();
+ return new LogEntry(extent, timestamp, server, filename);
}
public Text getRow() {
@@ -112,11 +103,16 @@ public class LogEntry {
return MetadataSchema.TabletsSection.LogColumnFamily.NAME;
}
+ public String getUniqueID() {
+ String parts[] = filename.split("/");
+ return parts[parts.length - 1];
+ }
+
public Text getColumnQualifier() {
return new Text(server + "/" + filename);
}
public Value getValue() {
- return new Value((Joiner.on(";").join(logSet) + "|" + tabletId).getBytes());
+ return new Value(filename.getBytes(UTF_8));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
new file mode 100644
index 0000000..cfe59f2
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class MetadataTableSchemaTest {
+
+ @Test
+ public void testGetTabletServer() throws Exception {
+ Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+ Text hostPort = new Text();
+ Text session = new Text();
+ CurrentLogsSection.getTabletServer(key, hostPort, session);
+ assertEquals("host:43861", hostPort.toString());
+ assertEquals("14a7df0e6420003", session.toString());
+ try {
+ Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+ CurrentLogsSection.getTabletServer(bogus, hostPort, session);
+ fail("bad argument not thrown");
+ } catch (IllegalArgumentException ex) {
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
new file mode 100644
index 0000000..91e5ee9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public enum TabletLevel {
+ ROOT,
+ META,
+ NORMAL;
+
+ public static TabletLevel getLevel(KeyExtent extent) {
+ if (!extent.isMeta())
+ return NORMAL;
+ if (extent.isRootTablet())
+ return ROOT;
+ return META;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index c3595cd..4722e60 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -128,15 +128,12 @@ public class VolumeUtil {
switchedPath = le.filename;
ArrayList<String> switchedLogs = new ArrayList<String>();
- for (String log : le.logSet) {
- String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
- if (switchedLog != null) {
- switchedLogs.add(switchedLog);
- numSwitched++;
- } else {
- switchedLogs.add(log);
- }
-
+ String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
+ if (switchedLog != null) {
+ switchedLogs.add(switchedLog);
+ numSwitched++;
+ } else {
+ switchedLogs.add(le.filename);
}
if (numSwitched == 0) {
@@ -144,9 +141,7 @@ public class VolumeUtil {
return null;
}
- LogEntry newLogEntry = new LogEntry(le);
- newLogEntry.filename = switchedPath;
- newLogEntry.logSet = switchedLogs;
+ LogEntry newLogEntry = new LogEntry(le.extent, le.timestamp, le.server, switchedPath);
log.trace("Switched " + le + " to " + newLogEntry);
@@ -244,7 +239,7 @@ public class VolumeUtil {
log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status));
// Before deleting these logs, we need to mark them for replication
for (LogEntry logEntry : logsToRemove) {
- ReplicationTableUtil.updateFiles(context, extent, logEntry.logSet, status);
+ ReplicationTableUtil.updateFiles(context, extent, logEntry.filename, status);
}
}
}
@@ -253,7 +248,6 @@ public class VolumeUtil {
// method this should return the exact strings that are in the metadata table
return ret;
-
}
private static String decommisionedTabletDir(AccumuloServerContext context, ZooLock zooLock, VolumeManager vm, KeyExtent extent, String metaDir)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index c6f1dd8..9afb93f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -533,6 +533,7 @@ public class Initialize implements KeywordExecutable {
zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_CURRENT_LOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 7ee6f0c..c154bd0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -17,6 +17,9 @@
package org.apache.accumulo.server.master.state;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchWriter;
@@ -27,9 +30,14 @@ import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
public class MetaDataStateStore extends TabletStateStore {
+ private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
private static final int THREADS = 4;
private static final int LATENCY = 1000;
@@ -59,7 +67,7 @@ public class MetaDataStateStore extends TabletStateStore {
@Override
public ClosableIterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state);
+ return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state, targetTableName);
}
@Override
@@ -116,7 +124,7 @@ public class MetaDataStateStore extends TabletStateStore {
}
@Override
- public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+ public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
BatchWriter writer = createBatchWriter();
try {
@@ -124,6 +132,15 @@ public class MetaDataStateStore extends TabletStateStore {
Mutation m = new Mutation(tls.extent.getMetadataEntry());
if (tls.current != null) {
tls.current.clearLocation(m);
+ if (logsForDeadServers != null) {
+ List<Path> logs = logsForDeadServers.get(tls.current);
+ if (logs != null) {
+ for (Path log : logs) {
+ LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
+ m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+ }
+ }
+ }
}
if (tls.future != null) {
tls.future.clearFutureLocation(m);
@@ -145,4 +162,30 @@ public class MetaDataStateStore extends TabletStateStore {
public String name() {
return "Normal Tablets";
}
+
+ @Override
+ public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
+ BatchWriter writer = createBatchWriter();
+ try {
+ for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ continue;
+ }
+ Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
+ for (Path log : entry.getValue()) {
+ m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
+ }
+ writer.addMutation(m);
+ }
+ } catch (Exception ex) {
+ log.error("Error marking logs as unused: " + logs);
+ throw new DistributedStoreException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException e) {
+ throw new DistributedStoreException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index d64c108..bec2dc4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -141,6 +141,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
boolean chopped = false;
for (Entry<Key,Value> entry : decodedRow.entrySet()) {
+
Key key = entry.getKey();
Text row = key.getRow();
Text cf = key.getColumnFamily();
@@ -173,8 +174,9 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
}
}
if (extent == null) {
- log.warn("No prev-row for key extent: " + decodedRow);
- return null;
+ String msg = "No prev-row for key extent " + decodedRow;
+ log.error(msg);
+ throw new BadLocationStateException(msg, k.getRow());
}
return new TabletLocationState(extent, future, current, last, walogs, chopped);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index fb30440..8116ecf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -68,6 +68,13 @@ public class TabletLocationState {
final public Collection<Collection<String>> walogs;
final public boolean chopped;
+ public TServerInstance futureOrCurrent() {
+ if (current != null) {
+ return current;
+ }
+ return future;
+ }
+
@Override
public String toString() {
return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 5413e31..acc10d8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -18,8 +18,11 @@ package org.apache.accumulo.server.master.state;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
/**
* Interface for storing information about tablet assignments. There are three implementations:
@@ -56,10 +59,12 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
*
* @param tablets
* the tablets' current information
+ * @param logsForDeadServers
+ * a cache of logs in use by servers when they died
*/
- abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+ abstract public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException;
- public static void unassign(AccumuloServerContext context, TabletLocationState tls) throws DistributedStoreException {
+ public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
TabletStateStore store;
if (tls.extent.isRootTablet()) {
store = new ZooTabletStateStore();
@@ -68,7 +73,7 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
} else {
store = new MetaDataStateStore(context);
}
- store.unassign(Collections.singletonList(tls));
+ store.unassign(Collections.singletonList(tls), logsForDeadServers);
}
public static void setLocation(AccumuloServerContext context, Assignment assignment) throws DistributedStoreException {
@@ -83,4 +88,9 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
store.setLocations(Collections.singletonList(assignment));
}
+ /**
+ * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets.
+ */
+ abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<Path>> logs) throws DistributedStoreException;
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index ab99396..bce20fd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -21,12 +21,17 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,10 +90,9 @@ public class ZooTabletStateStore extends TabletStateStore {
for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
if (logInfo != null) {
- LogEntry logEntry = new LogEntry();
- logEntry.fromBytes(logInfo);
- logs.add(logEntry.logSet);
- log.debug("root tablet logSet " + logEntry.logSet);
+ LogEntry logEntry = LogEntry.fromBytes(logInfo);
+ logs.add(Collections.singleton(logEntry.filename));
+ log.debug("root tablet log " + logEntry.filename);
}
}
TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
@@ -161,12 +165,28 @@ public class ZooTabletStateStore extends TabletStateStore {
}
@Override
- public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+ public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
if (tablets.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
TabletLocationState tls = tablets.iterator().next();
if (tls.extent.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
+ if (logsForDeadServers != null) {
+ List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
+ if (logs != null) {
+ for (Path entry : logs) {
+ LogEntry logEntry = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), entry.toString());
+ byte[] value;
+ try {
+ value = logEntry.toBytes();
+ } catch (IOException ex) {
+ throw new DistributedStoreException(ex);
+ }
+ store.put(RootTable.ZROOT_TABLET_WALOGS + "/" + logEntry.getUniqueID(), value);
+ store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" + MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString() + logEntry.getUniqueID());
+ }
+ }
+ }
store.remove(RootTable.ZROOT_TABLET_LOCATION);
store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
log.debug("unassign root tablet location");
@@ -177,4 +197,9 @@ public class ZooTabletStateStore extends TabletStateStore {
return "Root Table";
}
+ @Override
+ public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) {
+ // the root table is not replicated, so unassigning the root tablet has removed the current log marker
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index 898e3d4..d72eea2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -153,6 +153,19 @@ public class StatusUtil {
/**
* @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
*/
+ public static Status openWithUnknownLength(long timeCreated) {
+ Builder builder = Status.newBuilder();
+ builder.setBegin(0);
+ builder.setEnd(0);
+ builder.setInfiniteEnd(true);
+ builder.setClosed(false);
+ builder.setCreatedTime(timeCreated);
+ return builder.build();
+ }
+
+ /**
+ * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
+ */
public static Status openWithUnknownLength() {
return INF_END_REPLICATION_STATUS;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index e90d1dd..9e3fc7d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
/**
*
@@ -61,9 +62,6 @@ public class ListVolumesUsed {
private static void getLogURIs(TreeSet<String> volumes, LogEntry logEntry) {
volumes.add(getLogURI(logEntry.filename));
- for (String logSet : logEntry.logSet) {
- volumes.add(getLogURI(logSet));
- }
}
private static void listZookeeper() throws Exception {
@@ -123,6 +121,20 @@ public class ListVolumesUsed {
for (String volume : volumes)
System.out.println("\tVolume : " + volume);
+
+ volumes.clear();
+ scanner.clearColumns();
+ scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ Text path = new Text();
+ for (Entry<Key,Value> entry : scanner) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ volumes.add(getLogURI(path.toString()));
+ }
+
+ System.out.println("Listing volumes referenced in " + name + " current logs section");
+
+ for (String volume : volumes)
+ System.out.println("\tVolume : " + volume);
}
public static void listVolumes(ClientContext context) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 14eba68..4a5650e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -248,35 +248,27 @@ public class MasterMetadataUtil {
if (unusedWalLogs != null) {
updateRootTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
}
-
return;
}
-
Mutation m = getUpdateForTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
-
MetadataTableUtil.update(context, zooLock, m, extent);
-
}
/**
* Update the data file for the root tablet
*/
- protected static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+ private static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
IZooReaderWriter zk = ZooReaderWriter.getInstance();
- // unusedWalLogs will contain the location/name of each log in a log set
- // the log set is stored under one of the log names, but not both
- // find the entry under one of the names and delete it.
String root = MetadataTableUtil.getZookeeperLogLocation();
- boolean foundEntry = false;
for (String entry : unusedWalLogs) {
String[] parts = entry.split("/");
String zpath = root + "/" + parts[parts.length - 1];
while (true) {
try {
if (zk.exists(zpath)) {
+ log.debug("Removing WAL reference for root table " + zpath);
zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
- foundEntry = true;
}
break;
} catch (KeeperException e) {
@@ -287,16 +279,15 @@ public class MasterMetadataUtil {
UtilWaitThread.sleep(1000);
}
}
- if (unusedWalLogs.size() > 0 && !foundEntry)
- log.warn("WALog entry for root tablet did not exist " + unusedWalLogs);
}
+
/**
* Create an update that updates a tablet
*
* @return A Mutation to update a tablet from the given information
*/
- protected static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+ private static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
Mutation m = new Mutation(extent.getMetadataEntry());
@@ -324,6 +315,7 @@ public class MasterMetadataUtil {
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value(Long.toString(flushId).getBytes(UTF_8)));
+
return m;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 5e74aac..4470c55 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,8 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -61,6 +59,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -82,10 +81,12 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -121,7 +122,7 @@ public class MetadataTableUtil {
return metadataTable;
}
- private synchronized static Writer getRootTable(ClientContext context) {
+ public synchronized static Writer getRootTable(ClientContext context) {
Credentials credentials = context.getCredentials();
Writer rootTable = root_tables.get(credentials);
if (rootTable == null) {
@@ -223,7 +224,7 @@ public class MetadataTableUtil {
// add before removing in case of process death
for (LogEntry logEntry : logsToAdd)
- addLogEntry(context, logEntry, zooLock);
+ addRootLogEntry(context, zooLock, logEntry);
removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
} else {
@@ -248,6 +249,35 @@ public class MetadataTableUtil {
}
}
+ private static interface ZooOperation {
+ void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException;
+ }
+
+ private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) {
+ while (true) {
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.isLockHeld(zooLock.getLockID())) {
+ op.run(zoo);
+ }
+ break;
+ } catch (Exception e) {
+ log.error("Unexpected exception {}", e.getMessage(), e);
+ }
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+ private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = getZookeeperLogLocation();
+ rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+ }
+ });
+ }
+
public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
@@ -447,34 +477,6 @@ public class MetadataTableUtil {
return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
}
- public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) {
- if (entry.extent.isRootTablet()) {
- String root = getZookeeperLogLocation();
- while (true) {
- try {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID())) {
- String[] parts = entry.filename.split("/");
- String uniqueId = parts[parts.length - 1];
- zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
- }
- break;
- } catch (KeeperException e) {
- log.error("{}", e.getMessage(), e);
- } catch (InterruptedException e) {
- log.error("{}", e.getMessage(), e);
- } catch (IOException e) {
- log.error("{}", e.getMessage(), e);
- }
- UtilWaitThread.sleep(1000);
- }
- } else {
- Mutation m = new Mutation(entry.getRow());
- m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
- update(context, zooLock, m, entry.extent);
- }
- }
-
public static void setRootTabletDir(String dir) throws IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
@@ -565,22 +567,11 @@ public class MetadataTableUtil {
}
}
- Collections.sort(result, new Comparator<LogEntry>() {
- @Override
- public int compare(LogEntry o1, LogEntry o2) {
- long diff = o1.timestamp - o2.timestamp;
- if (diff < 0)
- return -1;
- if (diff > 0)
- return 1;
- return 0;
- }
- });
log.info("Returning logs " + result + " for extent " + extent);
return result;
}
- static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
+ static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String root = getZookeeperLogLocation();
// there's a little race between getting the children and fetching
@@ -588,11 +579,10 @@ public class MetadataTableUtil {
while (true) {
result.clear();
for (String child : zoo.getChildren(root)) {
- LogEntry e = new LogEntry();
try {
- e.fromBytes(zoo.getData(root + "/" + child, null));
+ LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null));
// upgrade from !0;!0<< -> +r<<
- e.extent = RootTable.EXTENT;
+ e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
result.add(e);
} catch (KeeperException.NoNodeException ex) {
continue;
@@ -662,28 +652,23 @@ public class MetadataTableUtil {
return new LogEntryIterator(context);
}
- public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
+ public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
if (extent.isRootTablet()) {
- for (LogEntry entry : logEntries) {
- String root = getZookeeperLogLocation();
- while (true) {
- try {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID())) {
- String parts[] = entry.filename.split("/");
- zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP);
- }
- break;
- } catch (Exception e) {
- log.error("{}", e.getMessage(), e);
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = getZookeeperLogLocation();
+ for (LogEntry entry : entries) {
+ String path = root + "/" + entry.getUniqueID();
+ log.debug("Removing " + path + " from zookeeper");
+ rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
}
- UtilWaitThread.sleep(1000);
}
- }
+ });
} else {
Mutation m = new Mutation(extent.getMetadataEntry());
- for (LogEntry entry : logEntries) {
- m.putDelete(LogColumnFamily.NAME, new Text(entry.getName()));
+ for (LogEntry entry : entries) {
+ m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
}
update(context, zooLock, m, extent);
}
@@ -1068,4 +1053,130 @@ public class MetadataTableUtil {
return tabletEntries;
}
+ public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, TabletLevel level) {
+ log.debug("Adding log entry " + filename);
+ if (level == TabletLevel.ROOT) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ String uniqueId = filename.getName();
+ StringBuilder path = new StringBuilder(root);
+ path.append("/");
+ path.append(CurrentLogsSection.getRowPrefix());
+ path.append(tabletSession.toString());
+ path.append(uniqueId);
+ rw.putPersistentData(path.toString(), filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+ }
+ });
+ } else {
+ Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES));
+ String tableName = MetadataTable.NAME;
+ if (level == TabletLevel.META) {
+ tableName = RootTable.NAME;
+ }
+ BatchWriter bw = null;
+ try {
+ bw = context.getConnector().createBatchWriter(tableName, null);
+ bw.addMutation(m);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (Exception e2) {
+ throw new RuntimeException(e2);
+ }
+ }
+ }
+ }
+ }
+
+ private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ String uniqueId = filename.getName();
+ String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
+ log.debug("Removing entry " + path + " from zookeeper");
+ rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ }
+ });
+ }
+
+ public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
+ // There could be a marker at the meta and/or root level, mark them both as unused
+ try {
+ BatchWriter root = null;
+ BatchWriter meta = null;
+ try {
+ root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (Path fname : all) {
+ Text tname = new Text(fname.toString());
+ Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
+ root.addMutation(m);
+ log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
+ m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
+ meta.addMutation(m);
+ removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+ }
+ } finally {
+ if (root != null) {
+ root.close();
+ }
+ if (meta != null) {
+ meta.close();
+ }
+ }
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
+
+ public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<Path>> logsForDeadServers)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ // already cached
+ if (logsForDeadServers.containsKey(server)) {
+ return;
+ }
+ if (extent.isRootTablet()) {
+ final List<Path> logs = new ArrayList<>();
+ retryZooKeeperUpdate(context, lock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ logs.clear();
+ for (String child : rw.getChildren(root)) {
+ logs.add(new Path(new String(rw.getData(root + "/" + child, null), UTF_8)));
+ }
+ }
+ });
+ logsForDeadServers.put(server, logs);
+ } else {
+ // use the correct meta table
+ String table = MetadataTable.NAME;
+ if (extent.isMeta()) {
+ table = RootTable.NAME;
+ }
+ // fetch the current logs in use, and put them in the cache
+ Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
+ scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
+ List<Path> logs = new ArrayList<>();
+ Text path = new Text();
+ for (Entry<Key,Value> entry : scanner) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
+ logs.add(new Path(path.toString()));
+ }
+ }
+ logsForDeadServers.put(server, logs);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 8e755a3..c6d5ce4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.server.util;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -176,20 +175,14 @@ public class ReplicationTableUtil {
/**
* Write replication ingest entries for each provided file with the given {@link Status}.
*/
- public static void updateFiles(ClientContext context, KeyExtent extent, Collection<String> files, Status stat) {
+ public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) {
if (log.isDebugEnabled()) {
- log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
+ log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat));
}
// TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294
- if (files.isEmpty()) {
- return;
- }
Value v = ProtobufUtil.toValue(stat);
- for (String file : files) {
- // TODO Can preclude this addition if the extent is for a table we don't need to replicate
- update(context, createUpdateMutation(new Path(file), v, extent), extent);
- }
+ update(context, createUpdateMutation(new Path(file), v, extent), extent);
}
static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 3983bde..04a83d3 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -91,7 +91,7 @@ public class ReplicationTableUtilTest {
String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
long createdTime = System.currentTimeMillis();
- ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime));
+ ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime));
verify(writer);