You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/07/06 20:22:41 UTC
[2/4] accumulo git commit: ACCUMULO-3339 extract tablet data needed
by a tablet to its own class
ACCUMULO-3339 extract tablet data needed by a tablet to its own class
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1e2a84f8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1e2a84f8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1e2a84f8
Branch: refs/heads/master
Commit: 1e2a84f836f3c7f09d221b1e7d819d0fce1bd8f7
Parents: eb4c38f
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Jul 6 14:03:15 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Jul 6 14:03:15 2015 -0400
----------------------------------------------------------------------
.../apache/accumulo/server/fs/VolumeUtil.java | 7 +-
.../apache/accumulo/tserver/TabletServer.java | 23 ++-
.../accumulo/tserver/tablet/SplitInfo.java | 85 --------
.../apache/accumulo/tserver/tablet/Tablet.java | 204 ++-----------------
.../accumulo/tserver/tablet/TabletData.java | 203 ++++++++++++++++++
.../apache/accumulo/test/BalanceFasterIT.java | 2 -
6 files changed, 241 insertions(+), 283 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 4722e60..7cd0d9e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -165,13 +164,13 @@ public class VolumeUtil {
}
}
- public static Text switchRootTabletVolume(KeyExtent extent, Text location) throws IOException {
+ public static String switchRootTabletVolume(KeyExtent extent, String location) throws IOException {
if (extent.isRootTablet()) {
- String newLocation = switchVolume(location.toString(), FileType.TABLE, ServerConstants.getVolumeReplacements());
+ String newLocation = switchVolume(location, FileType.TABLE, ServerConstants.getVolumeReplacements());
if (newLocation != null) {
MetadataTableUtil.setRootTabletDir(newLocation);
log.info("Volume replaced " + extent + " : " + location + " -> " + newLocation);
- return new Text(new Path(newLocation).toString());
+ return new Path(newLocation).toString();
}
}
return location;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index dc382f2..a8be243 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -233,9 +233,9 @@ import org.apache.accumulo.tserver.tablet.Compactor;
import org.apache.accumulo.tserver.tablet.KVEntry;
import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Scanner;
-import org.apache.accumulo.tserver.tablet.SplitInfo;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletClosedException;
+import org.apache.accumulo.tserver.tablet.TabletData;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
@@ -1876,7 +1876,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
private void splitTablet(Tablet tablet) {
try {
- TreeMap<KeyExtent,SplitInfo> tabletInfo = splitTablet(tablet, null);
+ TreeMap<KeyExtent,TabletData> tabletInfo = splitTablet(tablet, null);
if (tabletInfo == null) {
// either split or compact not both
// were not able to split... so see if a major compaction is
@@ -1892,10 +1892,10 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
- private TreeMap<KeyExtent,SplitInfo> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException {
+ private TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException {
long t1 = System.currentTimeMillis();
- TreeMap<KeyExtent,SplitInfo> tabletInfo = tablet.split(splitPoint);
+ TreeMap<KeyExtent,TabletData> tabletInfo = tablet.split(splitPoint);
if (tabletInfo == null) {
return null;
}
@@ -1906,11 +1906,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
Tablet[] newTablets = new Tablet[2];
- Entry<KeyExtent,SplitInfo> first = tabletInfo.firstEntry();
+ Entry<KeyExtent,TabletData> first = tabletInfo.firstEntry();
TabletResourceManager newTrm0 = resourceManager.createTabletResourceManager(first.getKey(), getTableConfiguration(first.getKey()));
newTablets[0] = new Tablet(TabletServer.this, first.getKey(), newTrm0, first.getValue());
- Entry<KeyExtent,SplitInfo> last = tabletInfo.lastEntry();
+ Entry<KeyExtent,TabletData> last = tabletInfo.lastEntry();
TabletResourceManager newTrm1 = resourceManager.createTabletResourceManager(last.getKey(), getTableConfiguration(last.getKey()));
newTablets[1] = new Tablet(TabletServer.this, last.getKey(), newTrm1, last.getValue());
@@ -2129,11 +2129,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
acquireRecoveryMemory(extent);
TabletResourceManager trm = resourceManager.createTabletResourceManager(extent, getTableConfiguration(extent));
-
+ TabletData data;
+ if (extent.isRootTablet()) {
+ data = new TabletData(fs, ZooReaderWriter.getInstance());
+ } else {
+ data = new TabletData(extent, fs, tabletsKeyValues.entrySet().iterator());
+ }
// this opens the tablet file and fills in the endKey in the extent
- locationToOpen = VolumeUtil.switchRootTabletVolume(extent, locationToOpen);
+ data.setDirectory(VolumeUtil.switchRootTabletVolume(extent, data.getDirectory()));
- tablet = new Tablet(TabletServer.this, extent, locationToOpen, trm, tabletsKeyValues);
+ tablet = new Tablet(TabletServer.this, extent, trm, data);
// If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor compacted.
// There are three reasons to wait for this minor compaction to finish before placing the tablet in online tablets.
//
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
deleted file mode 100644
index 64b6a11..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.tablet;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.master.state.TServerInstance;
-
-/**
- * operations are disallowed while we split which is ok since splitting is fast
- *
- * a minor compaction should have taken place before calling this so there should be relatively little left to compact
- *
- * we just need to make sure major compactions aren't occurring if we have the major compactor thread decide who needs splitting we can avoid synchronization
- * issues with major compactions
- *
- */
-
-final public class SplitInfo {
- private final String dir;
- private final SortedMap<FileRef,DataFileValue> datafiles;
- private final String time;
- private final long initFlushID;
- private final long initCompactID;
- private final TServerInstance lastLocation;
- private final Map<Long, ? extends Collection<FileRef>> bulkImported;
-
- SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID, long initCompactID, TServerInstance lastLocation,
- Map<Long, ? extends Collection<FileRef>> bulkImported) {
- this.dir = d;
- this.datafiles = dfv;
- this.time = time;
- this.initFlushID = initFlushID;
- this.initCompactID = initCompactID;
- this.lastLocation = lastLocation;
- this.bulkImported = bulkImported;
- }
-
- public String getDir() {
- return dir;
- }
-
- public SortedMap<FileRef,DataFileValue> getDatafiles() {
- return datafiles;
- }
-
- public String getTime() {
- return time;
- }
-
- public long getInitFlushID() {
- return initFlushID;
- }
-
- public long getInitCompactID() {
- return initCompactID;
- }
-
- public TServerInstance getLastLocation() {
- return lastLocation;
- }
-
- public Map<Long, ? extends Collection<FileRef>> getBulkImported() {
- return bulkImported;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index c0fb918..307044f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -17,8 +17,6 @@
package org.apache.accumulo.tserver.tablet;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -79,11 +77,6 @@ import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
import org.apache.accumulo.core.security.Authorizations;
@@ -96,7 +89,6 @@ import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.FileRef;
@@ -166,7 +158,6 @@ import com.google.common.cache.CacheBuilder;
*/
public class Tablet implements TabletCommitter {
static private final Logger log = Logger.getLogger(Tablet.class);
- static private final List<LogEntry> NO_LOG_ENTRIES = Collections.emptyList();
private final TabletServer tabletServer;
private final KeyExtent extent;
@@ -313,167 +304,10 @@ public class Tablet implements TabletCommitter {
this.tableConfiguration = tableConfiguration;
this.extent = extent;
this.configObserver = configObserver;
+ this.splitCreationTime = 0;
}
- public Tablet(TabletServer tabletServer, KeyExtent extent, TabletResourceManager trm, SplitInfo info) throws IOException {
- this(tabletServer, new Text(info.getDir()), extent, trm, info.getDatafiles(), info.getTime(), info.getInitFlushID(), info.getInitCompactID(), info
- .getLastLocation(), info.getBulkImported());
- splitCreationTime = System.currentTimeMillis();
- }
-
- private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue> datafiles,
- String time, long initFlushID, long initCompactID, TServerInstance lastLocation, Map<Long, ? extends Collection<FileRef>> bulkImported) throws IOException {
- this(tabletServer, extent, location, trm, NO_LOG_ENTRIES, datafiles, time, lastLocation, new HashSet<FileRef>(), initFlushID, initCompactID, bulkImported);
- }
-
- private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
- SortedMap<Key,Value> entries;
-
- if (extent.isRootTablet()) {
- return null;
- } else {
- entries = new TreeMap<Key,Value>();
- Text rowName = extent.getMetadataEntry();
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- if (entry.getKey().compareRow(rowName) == 0 && TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
- entries.put(new Key(entry.getKey()), new Value(entry.getValue()));
- }
- }
- }
-
- if (entries.size() == 1)
- return entries.values().iterator().next().toString();
- return null;
- }
-
- private static SortedMap<FileRef,DataFileValue> lookupDatafiles(AccumuloServerContext context, VolumeManager fs, KeyExtent extent,
- SortedMap<Key,Value> tabletsKeyValues) throws IOException {
-
- TreeMap<FileRef,DataFileValue> result = new TreeMap<FileRef,DataFileValue>();
-
- if (extent.isRootTablet()) { // the meta0 tablet
- Path location = new Path(MetadataTableUtil.getRootTabletDir());
-
- // cleanUpFiles() has special handling for delete. files
- FileStatus[] files = fs.listStatus(location);
- Collection<String> goodPaths = RootFiles.cleanupReplacement(fs, files, true);
- for (String good : goodPaths) {
- Path path = new Path(good);
- String filename = path.getName();
- FileRef ref = new FileRef(location.toString() + "/" + filename, path);
- DataFileValue dfv = new DataFileValue(0, 0);
- result.put(ref, dfv);
- }
- } else {
- final Text buffer = new Text();
-
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- Key k = entry.getKey();
- k.getColumnFamily(buffer);
- // Ignore anything but file:
- if (TabletsSection.DataFileColumnFamily.NAME.equals(buffer)) {
- FileRef ref = new FileRef(fs, k);
- result.put(ref, new DataFileValue(entry.getValue().get()));
- }
- }
- }
- return result;
- }
-
- private static List<LogEntry> lookupLogEntries(SortedMap<Key,Value> tabletsKeyValues, AccumuloServerContext context, KeyExtent ke) {
- List<LogEntry> result = new ArrayList<LogEntry>();
-
- if (ke.isRootTablet()) {
- try {
- result = MetadataTableUtil.getLogEntries(context, ke);
- } catch (Exception ex) {
- throw new RuntimeException("Unable to read tablet log entries", ex);
- }
- } else {
- log.debug("Looking at metadata " + tabletsKeyValues);
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- Key key = entry.getKey();
- if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
- result.add(LogEntry.fromKeyValue(key, entry.getValue()));
- }
- }
- }
-
- log.debug("got " + result + " for logs for " + ke);
- return result;
- }
-
- private static Set<FileRef> lookupScanFiles(SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws IOException {
- HashSet<FileRef> result = new HashSet<FileRef>();
-
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- Key key = entry.getKey();
- if (key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
- result.add(new FileRef(fs, key));
- }
- }
-
- return result;
- }
-
- private static long lookupFlushID(SortedMap<Key,Value> tabletsKeyValues) {
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- Key key = entry.getKey();
- if (FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
- return Long.parseLong(entry.getValue().toString());
- }
-
- return -1;
- }
-
- private static long lookupCompactID(SortedMap<Key,Value> tabletsKeyValues) {
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- Key key = entry.getKey();
- if (COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
- return Long.parseLong(entry.getValue().toString());
- }
-
- return -1;
- }
-
- private static TServerInstance lookupLastServer(SortedMap<Key,Value> tabletsKeyValues) {
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- if (entry.getKey().getColumnFamily().compareTo(LastLocationColumnFamily.NAME) == 0) {
- return new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
- }
- }
- return null;
- }
-
- private static Map<Long, List<FileRef>> lookupBulkImported(SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) {
- Map<Long,List<FileRef>> result = new HashMap<>();
- for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
- if (entry.getKey().getColumnFamily().compareTo(BulkFileColumnFamily.NAME) == 0) {
- Long id = Long.decode(entry.getValue().toString());
- List<FileRef> lst = result.get(id);
- if (lst == null) {
- lst = new ArrayList<FileRef>();
- }
- lst.add(new FileRef(fs, entry.getKey()));
- }
- }
- return result;
- }
-
- public Tablet(TabletServer tabletServer, KeyExtent extent, Text location, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
- throws IOException {
- this(tabletServer, extent, location, trm, lookupLogEntries(tabletsKeyValues, tabletServer, extent), lookupDatafiles(tabletServer,
- tabletServer.getFileSystem(), extent, tabletsKeyValues), lookupTime(tabletServer.getConfiguration(), extent, tabletsKeyValues),
- lookupLastServer(tabletsKeyValues), lookupScanFiles(tabletsKeyValues, tabletServer.getFileSystem()), lookupFlushID(tabletsKeyValues),
- lookupCompactID(tabletsKeyValues), lookupBulkImported(tabletsKeyValues, tabletServer.getFileSystem()));
- }
-
- /**
- * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time
- */
- private Tablet(final TabletServer tabletServer, final KeyExtent extent, final Text location, final TabletResourceManager trm,
- final List<LogEntry> rawLogEntries, final SortedMap<FileRef,DataFileValue> rawDatafiles, String time, final TServerInstance lastLocation,
- final Set<FileRef> scanFiles, final long initFlushID, final long initCompactID, final Map<Long, ? extends Collection<FileRef>> bulkImported) throws IOException {
+ public Tablet(final TabletServer tabletServer, final KeyExtent extent, final TabletResourceManager trm, TabletData data) throws IOException {
TableConfiguration tblConf = tabletServer.getTableConfiguration(extent);
if (null == tblConf) {
@@ -484,8 +318,10 @@ public class Tablet implements TabletCommitter {
this.tableConfiguration = tblConf;
- TabletFiles tabletPaths = VolumeUtil.updateTabletVolumes(tabletServer, tabletServer.getLock(), tabletServer.getFileSystem(), extent, new TabletFiles(
- location.toString(), rawLogEntries, rawDatafiles), ReplicationConfigurationUtil.isEnabled(extent, this.tableConfiguration));
+ TabletFiles tabletPaths = VolumeUtil
+ .updateTabletVolumes(tabletServer, tabletServer.getLock(), tabletServer.getFileSystem(), extent,
+ new TabletFiles(data.getDirectory(), data.getLogEntris(), data.getDataFiles()),
+ ReplicationConfigurationUtil.isEnabled(extent, this.tableConfiguration));
Path locationPath;
@@ -499,14 +335,16 @@ public class Tablet implements TabletCommitter {
final SortedMap<FileRef,DataFileValue> datafiles = tabletPaths.datafiles;
this.location = locationPath;
- this.lastLocation = lastLocation;
+ this.lastLocation = data.getLastLocation();
this.tabletDirectory = tabletPaths.dir;
this.extent = extent;
this.tabletResources = trm;
- this.lastFlushID = initFlushID;
- this.lastCompactID = initCompactID;
+ this.lastFlushID = data.getFlushID();
+ this.lastCompactID = data.getCompactID();
+ this.splitCreationTime = data.getSplitTime();
+ String time = data.getTime();
if (extent.isRootTablet()) {
long rtime = Long.MIN_VALUE;
@@ -590,8 +428,8 @@ public class Tablet implements TabletCommitter {
// Force a load of any per-table properties
configObserver.propertiesChanged();
- for (Long key : bulkImported.keySet()) {
- this.bulkImported.put(key, new CopyOnWriteArrayList<FileRef>(bulkImported.get(key)));
+ for (Entry<Long,List<FileRef>> entry : data.getBulkImported().entrySet()) {
+ this.bulkImported.put(entry.getKey(), new CopyOnWriteArrayList<FileRef>(entry.getValue()));
}
if (!logEntries.isEmpty()) {
@@ -681,7 +519,7 @@ public class Tablet implements TabletCommitter {
computeNumEntries();
- getDatafileManager().removeFilesAfterScan(scanFiles);
+ getDatafileManager().removeFilesAfterScan(data.getScanFiles());
// look for hints of a failure on the previous tablet server
if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) {
@@ -1628,7 +1466,7 @@ public class Tablet implements TabletCommitter {
private boolean sawBigRow = false;
private long timeOfLastMinCWhenBigFreakinRowWasSeen = 0;
private long timeOfLastImportWhenBigFreakinRowWasSeen = 0;
- private long splitCreationTime;
+ private final long splitCreationTime;
private SplitRowSpec findSplitRow(Collection<FileRef> files) {
@@ -2218,7 +2056,7 @@ public class Tablet implements TabletCommitter {
return majorCompactionQueued.size() > 0;
}
- public TreeMap<KeyExtent,SplitInfo> split(byte[] sp) throws IOException {
+ public TreeMap<KeyExtent,TabletData> split(byte[] sp) throws IOException {
if (sp != null && extent.getEndRow() != null && extent.getEndRow().equals(new Text(sp))) {
throw new IllegalArgumentException();
@@ -2253,7 +2091,7 @@ public class Tablet implements TabletCommitter {
synchronized (this) {
// java needs tuples ...
- TreeMap<KeyExtent,SplitInfo> newTablets = new TreeMap<KeyExtent,SplitInfo>();
+ TreeMap<KeyExtent,TabletData> newTablets = new TreeMap<KeyExtent,TabletData>();
long t1 = System.currentTimeMillis();
// choose a split point
@@ -2297,14 +2135,14 @@ public class Tablet implements TabletCommitter {
String time = tabletTime.getMetadataValue();
MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, getTabletServer(), getTabletServer().getLock());
- MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory, getTabletServer().getTabletSession(), lowDatafileSizes, getBulkIngestedFiles(), time,
- lastFlushID, lastCompactID, getTabletServer().getLock());
+ MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory, getTabletServer().getTabletSession(), lowDatafileSizes, getBulkIngestedFiles(),
+ time, lastFlushID, lastCompactID, getTabletServer().getLock());
MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, getTabletServer(), getTabletServer().getLock());
log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high);
- newTablets.put(high, new SplitInfo(tabletDirectory, highDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles()));
- newTablets.put(low, new SplitInfo(lowDirectory, lowDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles()));
+ newTablets.put(high, new TabletData(tabletDirectory, highDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles()));
+ newTablets.put(low, new TabletData(lowDirectory, lowDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles()));
long t2 = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
new file mode 100644
index 0000000..a076284
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TabletData {
+ private static Logger log = LoggerFactory.getLogger(TabletData.class);
+
+ private String time = null;
+ private SortedMap<FileRef,DataFileValue> dataFiles = new TreeMap<>();
+ private List<LogEntry> logEntris = new ArrayList<>();
+ private HashSet<FileRef> scanFiles = new HashSet<>();
+ private long flushID = -1;
+ private long compactID = -1;
+ private TServerInstance lastLocation = null;
+ private Map<Long,List<FileRef>> bulkImported = new HashMap<>();
+ private String directory = null;
+ private long splitTime = 0;
+
+ // read tablet data from metadata tables
+ public TabletData(KeyExtent extent, VolumeManager fs, Iterator<Entry<Key,Value>> entries) {
+ final Text family = new Text();
+ Text rowName = extent.getMetadataEntry();
+ while (entries.hasNext()) {
+ Entry<Key,Value> entry = entries.next();
+ Key key = entry.getKey();
+ Value value = entry.getValue();
+ key.getColumnFamily(family);
+ if (key.compareRow(rowName) != 0) {
+ log.info("Unexpected metadata table entry for {}: {}", extent, key.getRow());
+ continue;
+ }
+ if (ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
+ if (time == null) {
+ time = value.toString();
+ }
+ } else if (DataFileColumnFamily.NAME.equals(family)) {
+ FileRef ref = new FileRef(fs, key);
+ dataFiles.put(ref, new DataFileValue(entry.getValue().get()));
+ } else if (DIRECTORY_COLUMN.hasColumns(key)) {
+ directory = value.toString();
+ } else if (family.equals(LogColumnFamily.NAME)) {
+ logEntris.add(LogEntry.fromKeyValue(key, entry.getValue()));
+ } else if (family.equals(ScanFileColumnFamily.NAME)) {
+ scanFiles.add(new FileRef(fs, key));
+ } else if (FLUSH_COLUMN.hasColumns(key)) {
+ flushID = Long.parseLong(value.toString());
+ } else if (COMPACT_COLUMN.hasColumns(key)) {
+ compactID = Long.parseLong(entry.getValue().toString());
+ } else if (family.equals(LastLocationColumnFamily.NAME)) {
+ lastLocation = new TServerInstance(value, key.getColumnQualifier());
+ } else if (family.equals(BulkFileColumnFamily.NAME)) {
+ Long id = Long.decode(value.toString());
+ List<FileRef> lst = bulkImported.get(id);
+ if (lst == null) {
+ bulkImported.put(id, lst = new ArrayList<>());
+ }
+ lst.add(new FileRef(fs, key));
+ } else if (PREV_ROW_COLUMN.hasColumns(key)) {
+ KeyExtent check = new KeyExtent(key.getRow(), value);
+ if (!check.equals(extent)) {
+ throw new RuntimeException("Found bad entry for " + extent + ": " + check);
+ }
+ }
+ }
+ }
+
+ // read basic root table metadata from zookeeper
+ public TabletData(VolumeManager fs, ZooReader rdr) throws IOException {
+ Path location = new Path(MetadataTableUtil.getRootTabletDir());
+
+ // cleanUpFiles() has special handling for deleting files
+ FileStatus[] files = fs.listStatus(location);
+ Collection<String> goodPaths = RootFiles.cleanupReplacement(fs, files, true);
+ for (String good : goodPaths) {
+ Path path = new Path(good);
+ String filename = path.getName();
+ FileRef ref = new FileRef(location.toString() + "/" + filename, path);
+ DataFileValue dfv = new DataFileValue(0, 0);
+ dataFiles.put(ref, dfv);
+ }
+ try {
+ logEntris = MetadataTableUtil.getLogEntries(null, RootTable.EXTENT);
+ } catch (Exception ex) {
+ throw new RuntimeException("Unable to read tablet log entries", ex);
+ }
+ directory = MetadataTableUtil.getRootTabletDir();
+ }
+
+ // split
+ public TabletData(String tabletDirectory, SortedMap<FileRef,DataFileValue> highDatafileSizes, String time, long lastFlushID, long lastCompactID,
+ TServerInstance lastLocation, Map<Long,List<FileRef>> bulkIngestedFiles) {
+ this.directory = tabletDirectory;
+ this.dataFiles = highDatafileSizes;
+ this.time = time;
+ this.flushID = lastFlushID;
+ this.compactID = lastCompactID;
+ this.lastLocation = lastLocation;
+ this.bulkImported = bulkIngestedFiles;
+ this.splitTime = System.currentTimeMillis();
+ }
+
+ public static Logger getLog() {
+ return log;
+ }
+
+ public String getTime() {
+ return time;
+ }
+
+ public SortedMap<FileRef,DataFileValue> getDataFiles() {
+ return dataFiles;
+ }
+
+ public List<LogEntry> getLogEntris() {
+ return logEntris;
+ }
+
+ public HashSet<FileRef> getScanFiles() {
+ return scanFiles;
+ }
+
+ public long getFlushID() {
+ return flushID;
+ }
+
+ public long getCompactID() {
+ return compactID;
+ }
+
+ public TServerInstance getLastLocation() {
+ return lastLocation;
+ }
+
+ public Map<Long,List<FileRef>> getBulkImported() {
+ return bulkImported;
+ }
+
+ public String getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(String directory) {
+ this.directory = directory;
+ }
+
+ public long getSplitTime() {
+ return splitTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java
index 4418fe7..ab70224 100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java
@@ -45,8 +45,6 @@ import org.junit.Test;
// ACCUMULO-2952
public class BalanceFasterIT extends ConfigurableMacBase {
-
-
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(3);