You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:30 UTC
[30/51] [abbrv] git commit: Merge remote-tracking branch
'origin/master' into ACCUMULO-378
Merge remote-tracking branch 'origin/master' into ACCUMULO-378
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e81eee7f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e81eee7f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e81eee7f
Branch: refs/heads/master
Commit: e81eee7f7cd2641ffdace5af48a5027f7fcce620
Parents: 73d34ec f280e97
Author: Josh Elser <el...@apache.org>
Authored: Tue Jun 3 21:38:00 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Jun 3 21:38:00 2014 -0400
----------------------------------------------------------------------
.../accumulo/core/client/ZooKeeperInstance.java | 7 +-
.../core/client/impl/ConditionalWriterImpl.java | 4 +-
.../client/impl/InstanceOperationsImpl.java | 3 +-
.../accumulo/core/client/impl/Namespaces.java | 3 +-
.../core/client/impl/RootTabletLocator.java | 14 +-
.../accumulo/core/client/impl/ServerClient.java | 15 +-
.../accumulo/core/client/impl/Tables.java | 3 +-
.../core/client/impl/ZookeeperLockChecker.java | 10 +-
.../org/apache/accumulo/core/conf/Property.java | 8 +-
.../iterators/conf/ColumnToClassMapping.java | 1 +
.../accumulo/core/util/AsyncSocketAppender.java | 3 +-
.../core/client/ZooKeeperInstanceTest.java | 144 +
.../core/client/impl/RootTabletLocatorTest.java | 61 +
.../client/impl/ZookeeperLockCheckerTest.java | 58 +
.../core/util/AsyncSocketAppenderTest.java | 8 +-
docs/src/main/asciidoc/chapters/replication.txt | 21 +
.../accumulo/fate/zookeeper/ZooCache.java | 16 +-
.../fate/zookeeper/ZooCacheFactory.java | 78 +
.../apache/accumulo/fate/zookeeper/ZooLock.java | 2 +-
.../fate/zookeeper/ZooReaderWriter.java | 4 -
.../fate/zookeeper/ZooCacheFactoryTest.java | 87 +
.../accumulo/server/client/HdfsZooInstance.java | 3 +-
.../server/conf/NamespaceConfiguration.java | 3 +-
.../server/conf/TableConfiguration.java | 3 +-
.../accumulo/server/conf/ZooConfiguration.java | 5 +-
.../accumulo/server/tablets/TabletTime.java | 1 -
.../zookeeper/ZooReaderWriterFactory.java | 2 -
.../apache/accumulo/server/AccumuloTest.java | 1 -
.../server/watcher/MonitorLog4jWatcherTest.java | 8 +-
.../accumulo/tserver/CompactionStats.java | 59 -
.../accumulo/tserver/CompactionWatcher.java | 110 -
.../org/apache/accumulo/tserver/Compactor.java | 548 ---
.../apache/accumulo/tserver/FileManager.java | 12 +-
.../apache/accumulo/tserver/InMemoryMap.java | 2 +-
.../accumulo/tserver/MinorCompactionReason.java | 21 +
.../apache/accumulo/tserver/MinorCompactor.java | 146 -
.../java/org/apache/accumulo/tserver/Rate.java | 60 -
.../org/apache/accumulo/tserver/RootFiles.java | 133 -
.../tserver/TConstraintViolationException.java | 54 +
.../org/apache/accumulo/tserver/Tablet.java | 3856 ------------------
.../tserver/TabletIteratorEnvironment.java | 8 +-
.../apache/accumulo/tserver/TabletServer.java | 83 +-
.../tserver/TabletServerResourceManager.java | 67 +-
.../accumulo/tserver/TabletStatsKeeper.java | 6 +
.../apache/accumulo/tserver/log/DfsLogger.java | 60 +-
.../accumulo/tserver/log/LocalWALRecovery.java | 14 +-
.../tserver/log/TabletServerLogger.java | 4 +-
.../apache/accumulo/tserver/tablet/Batch.java | 51 +
.../accumulo/tserver/tablet/CommitSession.java | 121 +
.../accumulo/tserver/tablet/CompactionInfo.java | 129 +
.../tserver/tablet/CompactionRunner.java | 76 +
.../tserver/tablet/CompactionStats.java | 59 +
.../tserver/tablet/CompactionWatcher.java | 110 +
.../accumulo/tserver/tablet/Compactor.java | 424 ++
.../tserver/tablet/CountingIterator.java | 78 +
.../tserver/tablet/DatafileManager.java | 605 +++
.../apache/accumulo/tserver/tablet/KVEntry.java | 39 +
.../tserver/tablet/MinorCompactionTask.java | 99 +
.../accumulo/tserver/tablet/MinorCompactor.java | 142 +
.../apache/accumulo/tserver/tablet/Rate.java | 60 +
.../accumulo/tserver/tablet/RootFiles.java | 133 +
.../accumulo/tserver/tablet/ScanBatch.java | 37 +
.../accumulo/tserver/tablet/ScanDataSource.java | 222 +
.../accumulo/tserver/tablet/ScanOptions.java | 82 +
.../apache/accumulo/tserver/tablet/Scanner.java | 136 +
.../accumulo/tserver/tablet/SplitInfo.java | 76 +
.../accumulo/tserver/tablet/SplitRowSpec.java | 29 +
.../apache/accumulo/tserver/tablet/Tablet.java | 2581 ++++++++++++
.../tserver/tablet/TabletClosedException.java | 29 +
.../tserver/tablet/TabletCommitter.java | 51 +
.../accumulo/tserver/tablet/TabletMemory.java | 190 +
.../accumulo/tserver/CountingIteratorTest.java | 2 +-
.../apache/accumulo/tserver/RootFilesTest.java | 149 -
.../accumulo/tserver/tablet/RootFilesTest.java | 150 +
.../test/functional/MonitorLoggingIT.java | 1 -
test/system/continuous/master-agitator.pl | 3 +-
76 files changed, 6430 insertions(+), 5253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 8ad849b,1200fd1..59955f3
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -454,26 -441,6 +454,26 @@@ public enum Property
GENERAL_MAVEN_PROJECT_BASEDIR(AccumuloClassLoader.MAVEN_PROJECT_BASEDIR_PROPERTY_NAME, AccumuloClassLoader.DEFAULT_MAVEN_PROJECT_BASEDIR_VALUE,
PropertyType.ABSOLUTEPATH, "Set this to automatically add maven target/classes directories to your dynamic classpath"),
+ // General properties for configuring replication
+ REPLICATION_PREFIX("replication.", null, PropertyType.PREFIX, "Properties in this category affect the replication of data to other Accumulo instances."),
+ REPLICATION_PEERS("replication.peer.", null, PropertyType.PREFIX, "Properties in this category control what systems data can be replicated to"),
+ REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, "The username to provide when authenticating with the given peer"),
+ @Sensitive
+ REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
+ REPLICATION_NAME("replication.name", "", PropertyType.STRING, "Name of this cluster with respect to replication. Used to identify this instance from other peers"),
+ REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "1000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
+ REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
+ REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
+ REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),
+ REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number of attempts to try to replicate some data before giving up and letting it naturally be retried later"),
- REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replciation"),
- REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."),
++ REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replication"),
++ REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "30s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."),
+ REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"),
+ REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.SequentialWorkAssigner", PropertyType.CLASSNAME,
+ "Replication WorkAssigner implementation to use"),
- REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work"),
- REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work"),
++ REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work, not useful outside of tests"),
++ REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work, not useful outside of tests"),
+
;
private String key, defaultValue, description;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/docs/src/main/asciidoc/chapters/replication.txt
----------------------------------------------------------------------
diff --cc docs/src/main/asciidoc/chapters/replication.txt
index 9f367df,0000000..dc87b62
mode 100644,000000..100644
--- a/docs/src/main/asciidoc/chapters/replication.txt
+++ b/docs/src/main/asciidoc/chapters/replication.txt
@@@ -1,184 -1,0 +1,205 @@@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+== Replication
+
+=== Overview
+
+Replication is a feature of Accumulo which provides a mechanism to automatically
+copy data to other systems, typically for the purpose of disaster recovery,
+high availability, or geographic locality. It is best to consider this feature
+as a framework for automatic replication instead of the ability to copy data
+from to another Accumulo instance as copying to another Accumulo cluster is
+only an implementation detail. The local Accumulo cluster is hereby referred
+to as the +primary+ while systems being replicated to are known as
++peers+.
+
+This replication framework makes two Accumulo instances, where one instance
+replicates to another, eventually consistent between one another, as opposed
+to the strong consistency that each single Accumulo instance still holds. That
+is to say, attempts to read data from a table on a peer which has pending replication
+from the primary will not wait for that data to be replicated before running the scan.
+This is desirable for a number of reasons, the most important is that the replication
+framework is not limited by network outages or offline peers, but only by the HDFS
+space available on the primary system.
+
+Replication configurations can be considered as a directed graph which allows cycles.
+The systems in which data was replicated from is maintained in each Mutation which
+allow each system to determine if a peer has already has the data in which
+the system wants to send.
+
+Data is replicated by using the Write-Ahead logs (WAL) that each TabletServer is
+already maintaining. TabletServers records which WALs have data that need to be
+replicated to the +accumulo.metadata+ table. The Master uses these records,
+combined with the local Accumulo table that the WAL was used with, to create records
+in the +replication+ table which track which peers the given WAL should be
+replicated to. The Master latter uses these work entries to assign the actual
+replication task to a local TabletServer using ZooKeeper. A TabletServer will get
+a lock in ZooKeeper for the replication of this file to a peer, and proceed to
+replicate to the peer, recording progress in the +replication+ table as
+data is successfully replicated on the peer. Later, the Master and Garbage Collector
+will remove records from the +accumulo.metadata+ and +replication+ tables
+and files from HDFS, respectively, after replication to all peers is complete.
+
+=== Configuration
+
+Configuration of Accumulo to replicate data to another system can be categorized
+into the following sections.
+
+==== Site Configuration
+
+Each system involved in replication (even the primary) needs a name that uniquely
+identifies it across all peers in the replication graph. This should be considered
+fixed for an instance, and set in +accumulo-site.xml+.
+
+----
+<property>
+ <name>replication.name</name>
+ <value>primary</value>
+ <description>Unique name for this system used by replication</description>
+</property>
+----
+
+==== Instance Configuration
+
+For each peer of this system, Accumulo needs to know the name of that peer,
+the class used to replicate data to that system and some configuration information
+to connect to this remote peer. In the case of Accumulo, this additional data
+is the Accumulo instance name and ZooKeeper quorum; however, this varies on the
+replication implementation for the peer.
+
+These can be set in the site configuration to ease deployments; however, as they may
+change, it can be useful to set this information using the Accumulo shell.
+
+To configure a peer with the name +peer1+ which is an Accumulo system with an instance name of +accumulo_peer+
+and a ZooKeeper quorum of +10.0.0.1,10.0.2.1,10.0.3.1+, invoke the following
+command in the shell.
+
+----
+root@accumulo_primary> config -s
+replication.peer.peer1=org.apache.accumulo.tserver.replication.AccumuloReplicaSystem,accumulo_peer,10.0.0.1,10.0.2.1,10.0.3.1
+----
+
+Since this is an Accumulo system, we also want to set a username and password
+to use when authenticating with this peer. On our peer, we make a special user
+which has permission to write to the tables we want to replicate data into, "replication"
+with a password of "password". We then need to record this in the primary's configuration.
+
+----
+root@accumulo_primary> config -s replication.peer.user.peer1=replication
+root@accumulo_primary> config -s replication.peer.password.peer1=password
+----
+
+==== Table Configuration
+
+Now, we presently have a peer defined, so we just need to configure which tables will
+replicate to that peer. We also need to configure an identifier to determine where
+this data will be replicated on the peer. Since we're replicating to another Accumulo
+cluster, this is a table ID. In this example, we want to enable replication on
++my_table+ and configure our peer +accumulo_peer+ as a target, sending
+the data to the table with an ID of +2+ in +accumulo_peer+.
+
+\begingroup\fontsize{8pt}{8pt}\selectfont\begin{verbatim}
+root@accumulo_primary> config -t my_table -s table.replication=true
+root@accumulo_primary> config -t my_table -s table.replication.target.acccumulo_peer=2
+\end{verbatim}\endgroup
+
+To replicate a single table on the primary to multiple peers, the second command
+in the above shell snippet can be issued, for each peer and remote identifier pair.
+
+=== Monitoring
+
+Basic information about replication status from a primary can be found on the Accumulo
+Monitor server, using the +Replication+ link the sidebar.
+
+On this page, information is broken down into the following sections:
+
+1. Files pending replication by peer and target
+2. Files queued for replication, with progress made
+
+=== Work Assignment
+
+Depending on the schema of a table, different implementations of the WorkAssigner used could
+be configured. The implementation is controlled via the property +replication.work.assigner+
+and the full class name for the implementation. This can be configured via the shell or
++accumulo-site.xml+.
+
+----
+<property>
+ <name>replication.work.assigner</name>
+ <value>org.apache.accumulo.master.replication.SequentialWorkAssigner</value>
+ <description>Implementation used to assign work for replication</description>
+</property>
+----
+
+----
+root@accumulo_primary> config -t my_table -s replication.work.assigner=org.apache.accumulo.master.replication.SequentialWorkAssigner
+----
+
+Two implementations are provided. By default, the +SequentialWorkAssigner+ is configured for an
+instance. The SequentialWorkAssigner ensures that, per peer and each remote identifier, each WAL is
+replicated in the order in which they were created. This is sufficient to ensure that updates to a table
+will be replayed in the correct order on the peer. This implementation has the downside of only replicating
+a single WAL at a time.
+
+The second implementation, the +UnorderedWorkAssigner+ can be used to overcome the limitation
+of only a single WAL being replicated to a target and peer at any time. Depending on the table schema,
+it's possible that multiple versions of the same Key with different values are infrequent or nonexistent.
+In this case, parallel replication to a peer and target is possible without any downsides. In the case
+where this implementation is used were column updates are frequent, it is possible that there will be
+an inconsistency between the primary and the peer.
+
+=== ReplicaSystems
+
++ReplicaSystem+ is the interface which allows abstraction of replication of data
+to peers of various types. Presently, only an +AccumuloReplicaSystem+ is provided
+which will replicate data to another Accumulo instance. A +ReplicaSystem+ implementation
+is run inside of the TabletServer process, and can be configured as mentioned in the
++Instance Configuration+ section of this document. Theoretically, an implementation
+of this interface could send data to other filesystems, databases, etc.
+
+==== AccumuloReplicaSystem
+
+The +AccumuloReplicaSystem+ uses Thrift to communicate with a peer Accumulo instance
+and replicate the necessary data. The TabletServer running on the primary will communicate
+with the Master on the peer to request the address of a TabletServer on the peer which
+this TabletServer will use to replicate the data.
+
+The TabletServer on the primary will then replicate data in batches of a configurable
+size (+replication.max.unit.size+). The TabletServer on the peer will report how many
+records were applied back to the primary, which will be used to record how many records
+were successfully replicated. The TabletServer on the primary will continue to replicate
+data in these batches until no more data can be read from the file.
++
++=== Other Configuration
++
++There are a number of configuration values that can be used to control how
++the implementation of various components operate.
++
++[width="75%",cols=">,^2,^2"]
++[options="header"]
++|====
++|Property | Description | Default
++|replication.max.work.queue | Maximum number of files queued for replication at one time | 1000
++|replication.work.assignment.sleep | Time between invocations of the WorkAssigner | 30s
++|replication.worker.threads | Size of threadpool used to replicate data to peers | 4
++|replication.receipt.service.port | Thrift service port to listen for replication requests, can use '0' for a random port | 10002
++|replication.work.attempts | Number of attempts to replicate to a peer before aborting the attempt | 10
++|replication.receiver.min.threads | Minimum number of idle threads for handling incoming replication | 1
++|replication.receiver.threadcheck.time | Time between attempting adjustments of thread pool for incoming replications | 30s
++|replication.max.unit.size | Maximum amount of data to be replicated in one RPC | 64M
++|replication.work.assigner | Work Assigner implementation | org.apache.accumulo.master.replication.SequentialWorkAssigner
++|tserver.replication.batchwriter.replayer.memory| Size of BatchWriter cache to use in applying replication requests | 50M
++|====
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e4c7ef9,2a453a8..689557c
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -223,8 -211,17 +213,19 @@@ import org.apache.accumulo.tserver.metr
import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
+import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
+import org.apache.accumulo.tserver.replication.ReplicationWorker;
+ import org.apache.accumulo.tserver.tablet.CommitSession;
+ import org.apache.accumulo.tserver.tablet.CompactionInfo;
+ import org.apache.accumulo.tserver.tablet.CompactionWatcher;
+ import org.apache.accumulo.tserver.tablet.Compactor;
+ import org.apache.accumulo.tserver.tablet.KVEntry;
+ import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
+ import org.apache.accumulo.tserver.tablet.ScanBatch;
+ import org.apache.accumulo.tserver.tablet.Scanner;
+ import org.apache.accumulo.tserver.tablet.SplitInfo;
+ import org.apache.accumulo.tserver.tablet.Tablet;
+ import org.apache.accumulo.tserver.tablet.TabletClosedException;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
@@@ -3121,30 -3099,7 +3122,30 @@@ public class TabletServer extends Abstr
return address;
}
+ private HostAndPort startReplicationService() throws UnknownHostException {
+ ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance()));
+ ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
+ AccumuloConfiguration conf = getSystemConfiguration();
+ Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+ ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor,
+ "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
+ this.replServer = sp.server;
+ log.info("Started replication service on " + sp.address);
+
+ try {
+ // The replication service is unique to the thrift service for a tserver, not just a host.
+ // Advertise the host and port for replication service given the host and port for the tserver.
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(),
+ sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
+ } catch (Exception e) {
+ log.error("Could not advertise replication service port", e);
+ throw new RuntimeException(e);
+ }
+
+ return sp.address;
+ }
+
- ZooLock getLock() {
+ public ZooLock getLock() {
return tabletServerLock;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index b7b0aff,9fec437..b4f14ec
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -41,9 -36,6 +41,8 @@@ import org.apache.accumulo.core.replica
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
- import org.apache.accumulo.tserver.Tablet.CommitSession;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 0000000,2771db9..5b46b7b
mode 000000,100644..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@@ -1,0 -1,581 +1,605 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.accumulo.tserver.tablet;
+
+ import java.io.IOException;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
++import java.util.Map.Entry;
+ import java.util.Set;
+ import java.util.SortedMap;
+ import java.util.TreeMap;
+ import java.util.TreeSet;
-import java.util.Map.Entry;
+
+ import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.KeyExtent;
+ import org.apache.accumulo.core.metadata.schema.DataFileValue;
++import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
++import org.apache.accumulo.core.replication.StatusUtil;
+ import org.apache.accumulo.core.security.Credentials;
+ import org.apache.accumulo.core.util.MapCounter;
+ import org.apache.accumulo.core.util.Pair;
+ import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+ import org.apache.accumulo.server.ServerConstants;
+ import org.apache.accumulo.server.client.HdfsZooInstance;
+ import org.apache.accumulo.server.fs.FileRef;
+ import org.apache.accumulo.server.fs.VolumeManager;
+ import org.apache.accumulo.server.master.state.TServerInstance;
+ import org.apache.accumulo.server.security.SystemCredentials;
+ import org.apache.accumulo.server.util.MasterMetadataUtil;
+ import org.apache.accumulo.server.util.MetadataTableUtil;
++import org.apache.accumulo.server.util.ReplicationTableUtil;
+ import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+ import org.apache.accumulo.trace.instrument.Span;
+ import org.apache.accumulo.trace.instrument.Trace;
+ import org.apache.accumulo.tserver.TLevel;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.log4j.Logger;
+
+ class DatafileManager {
+ private final Logger log = Logger.getLogger(DatafileManager.class);
+ // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
+ private final Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
+ private final Tablet tablet;
+
+ // ensure we only have one reader/writer of our bulk file notes at at time
+ private final Object bulkFileImportLock = new Object();
+
+ DatafileManager(Tablet tablet, SortedMap<FileRef,DataFileValue> datafileSizes) {
+ for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet()) {
+ this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
+ }
+ this.tablet = tablet;
+ }
+
+ private FileRef mergingMinorCompactionFile = null;
+ private final Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
+ private final Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>();
+ private final MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>();
+ private long nextScanReservationId = 0;
+ private boolean reservationsBlocked = false;
+
+ private final Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
+
+ static void rename(VolumeManager fs, Path src, Path dst) throws IOException {
+ if (!fs.rename(src, dst)) {
+ throw new IOException("Rename " + src + " to " + dst + " returned false ");
+ }
+ }
+
+ Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
+ synchronized (tablet) {
+
+ while (reservationsBlocked) {
+ try {
+ tablet.wait(50);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+
+ Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet());
+
+ long rid = nextScanReservationId++;
+
+ scanFileReservations.put(rid, absFilePaths);
+
+ Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
+
+ for (FileRef path : absFilePaths) {
+ fileScanReferenceCounts.increment(path, 1);
+ ret.put(path, datafileSizes.get(path));
+ }
+
+ return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
+ }
+ }
+
+ void returnFilesForScan(Long reservationId) {
+
+ final Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+ synchronized (tablet) {
+ Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId);
+
+ if (absFilePaths == null)
+ throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
+
+ boolean notify = false;
+ for (FileRef path : absFilePaths) {
+ long refCount = fileScanReferenceCounts.decrement(path, 1);
+ if (refCount == 0) {
+ if (filesToDeleteAfterScan.remove(path))
+ filesToDelete.add(path);
+ notify = true;
+ } else if (refCount < 0)
+ throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
+ }
+
+ if (notify)
+ tablet.notifyAll();
+ }
+
+ if (filesToDelete.size() > 0) {
+ log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+ MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+ }
+ }
+
+ void removeFilesAfterScan(Set<FileRef> scanFiles) {
+ if (scanFiles.size() == 0)
+ return;
+
+ Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+ synchronized (tablet) {
+ for (FileRef path : scanFiles) {
+ if (fileScanReferenceCounts.get(path) == 0)
+ filesToDelete.add(path);
+ else
+ filesToDeleteAfterScan.add(path);
+ }
+ }
+
+ if (filesToDelete.size() > 0) {
+ log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+ MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+ }
+ }
+
+ private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
+ long startTime = System.currentTimeMillis();
+ TreeSet<FileRef> inUse = new TreeSet<FileRef>();
+
+ Span waitForScans = Trace.start("waitForScans");
+ try {
+ synchronized (tablet) {
+ if (blockNewScans) {
+ if (reservationsBlocked)
+ throw new IllegalStateException();
+
+ reservationsBlocked = true;
+ }
+
+ for (FileRef path : pathsToWaitFor) {
+ while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
+ try {
+ tablet.wait(100);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ for (FileRef path : pathsToWaitFor) {
+ if (fileScanReferenceCounts.get(path) > 0)
+ inUse.add(path);
+ }
+
+ if (blockNewScans) {
+ reservationsBlocked = false;
+ tablet.notifyAll();
+ }
+
+ }
+ } finally {
+ waitForScans.stop();
+ }
+ return inUse;
+ }
+
+ public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean setTime) throws IOException {
+
+ final KeyExtent extent = tablet.getExtent();
+ String bulkDir = null;
+
+ Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
+ for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
+ paths.put(entry.getKey(), entry.getValue());
+
+ for (FileRef tpath : paths.keySet()) {
+
+ boolean inTheRightDirectory = false;
+ Path parent = tpath.path().getParent().getParent();
+ for (String tablesDir : ServerConstants.getTablesDirs()) {
+ if (parent.equals(new Path(tablesDir, tablet.getExtent().getTableId().toString()))) {
+ inTheRightDirectory = true;
+ break;
+ }
+ }
+ if (!inTheRightDirectory) {
+ throw new IOException("Data file " + tpath + " not in table dirs");
+ }
+
+ if (bulkDir == null)
+ bulkDir = tpath.path().getParent().toString();
+ else if (!bulkDir.equals(tpath.path().getParent().toString()))
+ throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
+
+ }
+
+ if (tablet.getExtent().isRootTablet()) {
+ throw new IllegalArgumentException("Can not import files to root tablet");
+ }
+
+ synchronized (bulkFileImportLock) {
+ Credentials creds = SystemCredentials.get();
+ Connector conn;
+ try {
+ conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken());
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ // Remove any bulk files we've previously loaded and compacted away
+ List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
+
+ for (FileRef file : files)
+ if (paths.keySet().remove(file))
+ log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
+
+ if (paths.size() > 0) {
+ long bulkTime = Long.MIN_VALUE;
+ if (setTime) {
+ for (DataFileValue dfv : paths.values()) {
+ long nextTime = tablet.getAndUpdateTime();
+ if (nextTime < bulkTime)
+ throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime);
+ bulkTime = nextTime;
+ dfv.setTime(bulkTime);
+ }
+ }
+
+ tablet.updatePersistedTime(bulkTime, paths, tid);
+ }
+ }
+
+ synchronized (tablet) {
+ for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
+ if (datafileSizes.containsKey(tpath.getKey())) {
+ log.error("Adding file that is already in set " + tpath.getKey());
+ }
+ datafileSizes.put(tpath.getKey(), tpath.getValue());
+
+ }
+
+ tablet.getTabletResources().importedMapFiles();
+
+ tablet.computeNumEntries();
+ }
+
+ for (Entry<FileRef,DataFileValue> entry : paths.entrySet()) {
+ log.log(TLevel.TABLET_HIST, tablet.getExtent() + " import " + entry.getKey() + " " + entry.getValue());
+ }
+ }
+
+ FileRef reserveMergingMinorCompactionFile() {
+ if (mergingMinorCompactionFile != null)
+ throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile);
+
+ if (tablet.getExtent().isRootTablet())
+ return null;
+
+ int maxFiles = tablet.getTableConfiguration().getMaxFilesPerTablet();
+
+ // when a major compaction is running and we are at max files, write out
+ // one extra file... want to avoid the case where major compaction is
+ // compacting everything except for the largest file, and therefore the
+ // largest file is returned for merging.. the following check mostly
+ // avoids this case, except for the case where major compactions fail or
+ // are canceled
+ if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
+ return null;
+
+ if (datafileSizes.size() >= maxFiles) {
+ // find the smallest file
+
+ long min = Long.MAX_VALUE;
+ FileRef minName = null;
+
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
+ min = entry.getValue().getSize();
+ minName = entry.getKey();
+ }
+ }
+
+ if (minName == null)
+ return null;
+
+ mergingMinorCompactionFile = minName;
+ return minName;
+ }
+
+ return null;
+ }
+
+ void unreserveMergingMinorCompactionFile(FileRef file) {
+ if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
+ || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
+ throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
+
+ mergingMinorCompactionFile = null;
+ }
+
+ void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId)
+ throws IOException {
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ if (tablet.getExtent().isRootTablet()) {
+ try {
+ if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+ }
+ }
+
+ // rename before putting in metadata table, so files in metadata table should
+ // always exist
+ do {
+ try {
+ if (dfv.getNumEntries() == 0) {
+ tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.path());
+ } else {
+ if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+ log.warn("Target map file already exist " + newDatafile);
+ tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+ }
+
+ rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+ }
+ break;
+ } catch (IOException ioe) {
+ log.warn("Tablet " + tablet.getExtent() + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe);
+ UtilWaitThread.sleep(60 * 1000);
+ }
+ } while (true);
+
+ long t1, t2;
+
+ // the code below always assumes merged files are in use by scans... this must be done
+ // because the in memory list of files is not updated until after the metadata table
+ // therefore the file is available to scans until memory is updated, but want to ensure
+ // the file is not available for garbage collection... if memory were updated
+ // before this point (like major compactions do), then the following code could wait
+ // for scans to finish like major compactions do.... used to wait for scans to finish
+ // here, but that was incorrect because a scan could start after waiting but before
+ // memory was updated... assuming the file is always in use by scans leads to
+ // one uneeded metadata update when it was not actually in use
+ Set<FileRef> filesInUseByScans = Collections.emptySet();
+ if (absMergeFile != null)
+ filesInUseByScans = Collections.singleton(absMergeFile);
+
+ // very important to write delete entries outside of log lock, because
+ // this metadata write does not go up... it goes sideways or to itself
+ if (absMergeFile != null)
+ MetadataTableUtil.addDeleteEntries(tablet.getExtent(), Collections.singleton(absMergeFile), SystemCredentials.get());
+
+ Set<String> unusedWalLogs = tablet.beginClearingUnusedLogs();
++ boolean replicate = ReplicationConfigurationUtil.isEnabled(tablet.getExtent(), tablet.getTableConfiguration());
++ Set<String> logFileOnly = null;
++ if (replicate) {
++ // unusedWalLogs is of the form host/fileURI, need to strip off the host portion
++ logFileOnly = new HashSet<>();
++ for (String unusedWalLog : unusedWalLogs) {
++ int index = unusedWalLog.indexOf('/');
++ if (-1 == index) {
++ log.warn("Could not find host component to strip from DFSLogger representation of WAL");
++ } else {
++ unusedWalLog = unusedWalLog.substring(index + 1);
++ }
++ logFileOnly.add(unusedWalLog);
++ }
++ }
+ try {
+ // the order of writing to metadata and walog is important in the face of machine/process failures
+ // need to write to metadata before writing to walog, when things are done in the reverse order
+ // data could be lost... the minor compaction start even should be written before the following metadata
+ // write is made
+
+ tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, absMergeFile, dfv, unusedWalLogs, filesInUseByScans, flushId);
+
++ // Mark that we have data we want to replicate
++ // This WAL could still be in use by other Tablets *from the same table*, so we can only mark that there is data to replicate,
++ // but it is *not* closed
++ if (replicate) {
++ ReplicationTableUtil.updateFiles(SystemCredentials.get(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength());
++ }
+ } finally {
+ tablet.finishClearingUnusedLogs();
+ }
+
+ do {
+ try {
+ // the purpose of making this update use the new commit session, instead of the old one passed in,
+ // is because the new one will reference the logs used by current memory...
+
+ tablet.getTabletServer().minorCompactionFinished(tablet.getTabletMemory().getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
+ break;
+ } catch (IOException e) {
+ log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e);
+ UtilWaitThread.sleep(1 * 1000);
+ }
+ } while (true);
+
+ synchronized (tablet) {
+ t1 = System.currentTimeMillis();
+
+ if (datafileSizes.containsKey(newDatafile)) {
+ log.error("Adding file that is already in set " + newDatafile);
+ }
+
+ if (dfv.getNumEntries() > 0) {
+ datafileSizes.put(newDatafile, dfv);
+ }
+
+ if (absMergeFile != null) {
+ datafileSizes.remove(absMergeFile);
+ }
+
+ unreserveMergingMinorCompactionFile(absMergeFile);
+
+ tablet.flushComplete(flushId);
+
+ t2 = System.currentTimeMillis();
+ }
+
+ // must do this after list of files in memory is updated above
+ removeFilesAfterScan(filesInUseByScans);
+
+ if (absMergeFile != null)
+ log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [" + absMergeFile + ",memory] -> " + newDatafile);
+ else
+ log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [memory] -> " + newDatafile);
+ log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, tablet.getExtent().toString()));
+ long splitSize = tablet.getTableConfiguration().getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
+ if (dfv.getSize() > splitSize) {
+ log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d", splitSize, dfv.getSize()));
+ }
+ }
+
+ public void reserveMajorCompactingFiles(Collection<FileRef> files) {
+ if (majorCompactingFiles.size() != 0)
+ throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
+
+ if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile))
+ throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
+
+ majorCompactingFiles.addAll(files);
+ }
+
+ public void clearMajorCompactingFile() {
+ majorCompactingFiles.clear();
+ }
+
+ void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
+ throws IOException {
+ final KeyExtent extent = tablet.getExtent();
+ long t1, t2;
+
+ if (!extent.isRootTablet()) {
+
+ if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+ log.error("Target map file already exist " + newDatafile, new Exception());
+ throw new IllegalStateException("Target map file already exist " + newDatafile);
+ }
+
+ // rename before putting in metadata table, so files in metadata table should
+ // always exist
+ rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+
+ if (dfv.getNumEntries() == 0) {
+ tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+ }
+ }
+
+ TServerInstance lastLocation = null;
+ synchronized (tablet) {
+
+ t1 = System.currentTimeMillis();
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ tablet.incrementDataSourceDeletions();
+
+ if (extent.isRootTablet()) {
+
+ waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
+
+ try {
+ if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+ }
+
+ // mark files as ready for deletion, but
+ // do not delete them until we successfully
+ // rename the compacted map file, in case
+ // the system goes down
+
+ RootFiles.replaceFiles(tablet.getTableConfiguration(), tablet.getTabletServer().getFileSystem(), tablet.getLocation(), oldDatafiles, tmpDatafile, newDatafile);
+ }
+
+ // atomically remove old files and add new file
+ for (FileRef oldDatafile : oldDatafiles) {
+ if (!datafileSizes.containsKey(oldDatafile)) {
+ log.error("file does not exist in set " + oldDatafile);
+ }
+ datafileSizes.remove(oldDatafile);
+ majorCompactingFiles.remove(oldDatafile);
+ }
+
+ if (datafileSizes.containsKey(newDatafile)) {
+ log.error("Adding file that is already in set " + newDatafile);
+ }
+
+ if (dfv.getNumEntries() > 0) {
+ datafileSizes.put(newDatafile, dfv);
+ }
+
+ // could be used by a follow on compaction in a multipass compaction
+ majorCompactingFiles.add(newDatafile);
+
+ tablet.computeNumEntries();
+
+ lastLocation = tablet.resetLastLocation();
+
+ tablet.setLastCompactionID(compactionId);
+ t2 = System.currentTimeMillis();
+ }
+
+ if (!extent.isRootTablet()) {
+ Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
+ if (filesInUseByScans.size() > 0)
+ log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
+ MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
+ tablet.getTabletServer().getClientAddressString(), lastLocation, tablet.getTabletServer().getLock());
+ removeFilesAfterScan(filesInUseByScans);
+ }
+
+ log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
+ log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile);
+ }
+
+ public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
+ synchronized (tablet) {
+ TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes);
+ return Collections.unmodifiableSortedMap(copy);
+ }
+ }
+
+ public Set<FileRef> getFiles() {
+ synchronized (tablet) {
+ HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet());
+ return Collections.unmodifiableSet(files);
+ }
+ }
+
+ public int getNumFiles() {
+ return datafileSizes.size();
+ }
+
+ }