You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/05/12 18:42:36 UTC
[1/2] accumulo git commit: ACCUMULO-3327 clean up book-keeping
periodically, checking against zookeeper
Repository: accumulo
Updated Branches:
refs/heads/master 0b7d00db8 -> e2e5afb2a
ACCUMULO-3327 clean up book-keeping periodically, checking against zookeeper
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8efc9546
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8efc9546
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8efc9546
Branch: refs/heads/master
Commit: 8efc9546400f9b76afc3bfad93046a487c147e82
Parents: 8ccd7e7
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue May 12 12:42:14 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue May 12 12:42:14 2015 -0400
----------------------------------------------------------------------
.../server/zookeeper/TransactionWatcher.java | 21 +++++++
.../apache/accumulo/tserver/TabletServer.java | 4 ++
.../tserver/tablet/BulkImportCacheCleaner.java | 60 ++++++++++++++++++++
.../apache/accumulo/tserver/tablet/Tablet.java | 16 +++++-
.../performance/metadata/FastBulkImportIT.java | 2 +-
5 files changed, 99 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
index 0e1cdfd..da94a3c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
@@ -16,8 +16,13 @@
*/
package org.apache.accumulo.server.zookeeper;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReader;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -59,6 +64,22 @@ public class TransactionWatcher extends org.apache.accumulo.fate.zookeeper.Trans
writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", NodeMissingPolicy.SKIP);
}
+ public static Set<Long> allTransactionsAlive(String type) throws KeeperException, InterruptedException {
+ final Instance instance = HdfsZooInstance.getInstance();
+ final IZooReader reader = ZooReaderWriter.getInstance();
+ final Set<Long> result = new HashSet<>();
+ final String parent = ZooUtil.getRoot(instance) + "/" + type;
+ reader.sync(parent);
+ List<String> children = reader.getChildren(parent);
+ for (String child : children) {
+ if (child.endsWith("-running")) {
+ continue;
+ }
+ result.add(Long.parseLong(child));
+ }
+ return result;
+ }
+
@Override
public boolean transactionComplete(String type, long tid) throws Exception {
String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6f2c9a2..7154732 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -223,6 +223,7 @@ import org.apache.accumulo.tserver.session.ScanSession;
import org.apache.accumulo.tserver.session.Session;
import org.apache.accumulo.tserver.session.SessionManager;
import org.apache.accumulo.tserver.session.UpdateSession;
+import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.accumulo.tserver.tablet.CompactionInfo;
import org.apache.accumulo.tserver.tablet.CompactionWatcher;
@@ -2423,6 +2424,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
};
SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000);
+ final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
+ SimpleTimer.getInstance(aconf).schedule(new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS);
+
HostAndPort masterHost;
while (!serverStopRequested) {
// send all of the pending messages
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
new file mode 100644
index 0000000..fff2be2
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BulkImportCacheCleaner implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(BulkImportCacheCleaner.class);
+ private final TabletServer server;
+
+ public BulkImportCacheCleaner(TabletServer server) {
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
+ // gather the list of transactions the tablets have cached
+ final Set<Long> tids = new HashSet<>();
+ for (Tablet tablet : server.getOnlineTablets()) {
+ tids.addAll(tablet.getBulkIngestedFiles().keySet());
+ }
+ try {
+ // get the current transactions from ZooKeeper
+ final Set<Long> allTransactionsAlive = ZooArbitrator.allTransactionsAlive(Constants.BULK_ARBITRATOR_TYPE);
+ // remove any that are still alive
+ tids.removeAll(allTransactionsAlive);
+ // cleanup any memory of these transactions
+ for (Tablet tablet : server.getOnlineTablets()) {
+ tablet.cleanupBulkLoadedFiles(tids);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ // we'll just clean it up again later
+ log.debug("Error reading bulk import live transactions {}", e.toString());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 5de3236..7eb2069 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -41,7 +41,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -253,7 +253,7 @@ public class Tablet implements TabletCommitter {
private final ConfigurationObserver configObserver;
- private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().expireAfterAccess(4, TimeUnit.HOURS).build();
+ private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().build();
private final int logId;
@@ -586,7 +586,7 @@ public class Tablet implements TabletCommitter {
// Force a load of any per-table properties
configObserver.propertiesChanged();
for (Long key : bulkImported.keys()) {
- this.bulkImported.put(key, new ArrayList<FileRef>(bulkImported.get(key)));
+ this.bulkImported.put(key, new CopyOnWriteArrayList<FileRef>(bulkImported.get(key)));
}
if (!logEntries.isEmpty()) {
@@ -2792,4 +2792,14 @@ public class Tablet implements TabletCommitter {
}
}
+ public Map<Long, List<FileRef>> getBulkIngestedFiles() {
+ return new HashMap<Long, List<FileRef>>(bulkImported.asMap());
+ }
+
+ public void cleanupBulkLoadedFiles(Set<Long> tids) {
+ for (Long tid : tids) {
+ bulkImported.invalidate(tid);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
index 05c907c..5f670cc 100644
--- a/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -68,7 +68,7 @@ public class FastBulkImportIT extends ConfigurableMacIT {
}
c.tableOperations().addSplits(tableName, splits);
- log.info("Creating bulk import files");
+ log.info("Creating lots of bulk import files");
FileSystem fs = getCluster().getFileSystem();
Path basePath = getCluster().getTemporaryPath();
CachedConfiguration.setInstance(fs.getConf());
[2/2] accumulo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/accumulo
Posted by ec...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/accumulo
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e2e5afb2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e2e5afb2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e2e5afb2
Branch: refs/heads/master
Commit: e2e5afb2a86c3bc74eed7a0aab21291c941ded2e
Parents: 8efc954 0b7d00d
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue May 12 12:42:21 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue May 12 12:42:21 2015 -0400
----------------------------------------------------------------------
CHANGES | 3 +
DEPENDENCIES | 21 ++++
UPGRADING.md | 106 +++++++++++++++++
pom.xml | 2 +-
.../server/GarbageCollectionLogger.java | 116 +++++++++++++++++++
.../server/GarbageCollectionLogger.java | 116 -------------------
6 files changed, 247 insertions(+), 117 deletions(-)
----------------------------------------------------------------------