You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/06/03 20:49:16 UTC
[05/10] ACCUMULO-2041 extract tablet classes to new files,
move tablet-related code to o.a.a.tserver.tablet,
make member variables private
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java
new file mode 100644
index 0000000..9278cb2
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.MinorCompactionReason;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+
+class MinorCompactionTask implements Runnable {
+
+ private final Tablet tablet;
+ private long queued;
+ private CommitSession commitSession;
+ private DataFileValue stats;
+ private FileRef mergeFile;
+ private long flushId;
+ private MinorCompactionReason mincReason;
+
+ MinorCompactionTask(Tablet tablet, FileRef mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
+ this.tablet = tablet;
+ queued = System.currentTimeMillis();
+ tablet.minorCompactionWaitingToStart();
+ this.commitSession = commitSession;
+ this.mergeFile = mergeFile;
+ this.flushId = flushId;
+ this.mincReason = mincReason;
+ }
+
+ @Override
+ public void run() {
+ tablet.isMinorCompactionRunning();
+ Span minorCompaction = Trace.on("minorCompaction");
+ try {
+ FileRef newMapfileLocation = tablet.getNextMapFilename(mergeFile == null ? "F" : "M");
+ FileRef tmpFileRef = new FileRef(newMapfileLocation.path() + "_tmp");
+ Span span = Trace.start("waitForCommits");
+ synchronized (tablet) {
+ commitSession.waitForCommitsToFinish();
+ }
+ span.stop();
+ span = Trace.start("start");
+ while (true) {
+ try {
+ // the purpose of the minor compaction start event is to keep track of the filename... in the case
+ // where the metadata table write for the minor compaction finishes and the process dies before
+ // writing the minor compaction finish event, then the start event+filename in metadata table will
+ // prevent recovery of duplicate data... the minor compaction start event could be written at any time
+ // before the metadata write for the minor compaction
+ tablet.getTabletServer().minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString());
+ break;
+ } catch (IOException e) {
+ Tablet.log.warn("Failed to write to write ahead log " + e.getMessage(), e);
+ }
+ }
+ span.stop();
+ span = Trace.start("compact");
+ this.stats = tablet.minorCompact(tablet.getTabletServer().getFileSystem(), tablet.getTabletMemory().getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId,
+ mincReason);
+ span.stop();
+
+ if (tablet.needsSplit()) {
+ tablet.getTabletServer().executeSplit(tablet);
+ } else {
+ tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL);
+ }
+ } catch (Throwable t) {
+ Tablet.log.error("Unknown error during minor compaction for extent: " + tablet.getExtent(), t);
+ throw new RuntimeException(t);
+ } finally {
+ tablet.minorCompactionComplete();
+ minorCompaction.data("extent", tablet.getExtent().toString());
+ minorCompaction.data("numEntries", Long.toString(this.stats.getNumEntries()));
+ minorCompaction.data("size", Long.toString(this.stats.getSize()));
+ minorCompaction.stop();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
new file mode 100644
index 0000000..6636159
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.MinorCompactionReason;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class MinorCompactor extends Compactor {
+
+ private static final Logger log = Logger.getLogger(MinorCompactor.class);
+
+ private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
+
+ private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) {
+ if (mergeFile == null)
+ return EMPTY_MAP;
+
+ return Collections.singletonMap(mergeFile, dfv);
+ }
+
+ public MinorCompactor(VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf,
+ KeyExtent extent, MinorCompactionReason mincReason) {
+ super(fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() {
+
+ @Override
+ public boolean isCompactionEnabled() {
+ return true;
+ }
+
+ @Override
+ public IteratorScope getIteratorScope() {
+ return IteratorScope.minc;
+ }
+ }, Collections.<IteratorSetting>emptyList(), mincReason.ordinal());
+ }
+
+ private boolean isTableDeleting() {
+ try {
+ return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING;
+ } catch (Exception e) {
+ log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e);
+ return false; // can not get positive confirmation that its deleting.
+ }
+ }
+
+ @Override
+ public CompactionStats call() {
+ log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
+
+ // output to new MapFile with a temporary name
+ int sleepTime = 100;
+ double growthFactor = 4;
+ int maxSleepTime = 1000 * 60 * 3; // 3 minutes
+ boolean reportedProblem = false;
+
+ runningCompactions.add(this);
+ try {
+ do {
+ try {
+ CompactionStats ret = super.call();
+
+ // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted,
+ // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes()));
+
+ if (reportedProblem) {
+ ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile());
+ }
+
+ return ret;
+ } catch (IOException e) {
+ log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...");
+ ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
+ reportedProblem = true;
+ } catch (RuntimeException e) {
+ // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the
+ // minor compaction would succeed
+ log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e);
+ ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
+ reportedProblem = true;
+ } catch (CompactionCanceledException e) {
+ throw new IllegalStateException(e);
+ }
+
+ Random random = new Random();
+
+ int sleep = sleepTime + random.nextInt(sleepTime);
+ log.debug("MinC failed sleeping " + sleep + " ms before retrying");
+ UtilWaitThread.sleep(sleep);
+ sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor));
+
+ // clean up
+ try {
+ if (getFileSystem().exists(new Path(getOutputFile()))) {
+ getFileSystem().deleteRecursively(new Path(getOutputFile()));
+ }
+ } catch (IOException e) {
+ log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
+ }
+
+ if (isTableDeleting())
+ return new CompactionStats(0, 0);
+
+ } while (true);
+ } finally {
+ thread = null;
+ runningCompactions.remove(this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java
new file mode 100644
index 0000000..450fffe
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public class Rate {
+ private long lastCounter = -1;
+ private long lastTime = -1;
+ private double current = 0.0;
+ final double ratio;
+
+ /**
+ * Turn a counter into an exponentially smoothed rate over time.
+ *
+ * @param ratio
+ * the rate at which each update influences the curve; must be (0., 1.0)
+ */
+ public Rate(double ratio) {
+ if (ratio <= 0. || ratio >= 1.0)
+ throw new IllegalArgumentException("ratio must be > 0. and < 1.0");
+ this.ratio = ratio;
+ }
+
+ public double update(long counter) {
+ return update(System.currentTimeMillis(), counter);
+ }
+
+ synchronized public double update(long when, long counter) {
+ if (lastCounter < 0) {
+ lastTime = when;
+ lastCounter = counter;
+ return current;
+ }
+ if (lastTime == when) {
+ throw new IllegalArgumentException("update time < last value");
+ }
+ double keep = 1. - ratio;
+ current = (keep * current + ratio * ((counter - lastCounter)) * 1000. / (when - lastTime));
+ lastTime = when;
+ lastCounter = counter;
+ return current;
+ }
+
+ synchronized public double rate() {
+ return this.current;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java
new file mode 100644
index 0000000..3a8bb08
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+public class RootFiles {
+
+ private static Logger log = Logger.getLogger(RootFiles.class);
+
+ public static void prepareReplacement(VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName) throws IOException {
+ for (FileRef ref : oldDatafiles) {
+ Path path = ref.path();
+ DatafileManager.rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName()));
+ }
+ }
+
+ public static void renameReplacement(VolumeManager fs, FileRef tmpDatafile, FileRef newDatafile) throws IOException {
+ if (fs.exists(newDatafile.path())) {
+ log.error("Target map file already exist " + newDatafile, new Exception());
+ throw new IllegalStateException("Target map file already exist " + newDatafile);
+ }
+
+ DatafileManager.rename(fs, tmpDatafile.path(), newDatafile.path());
+ }
+
+ public static void finishReplacement(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName)
+ throws IOException {
+ // start deleting files, if we do not finish they will be cleaned
+ // up later
+ for (FileRef ref : oldDatafiles) {
+ Path path = ref.path();
+ Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName());
+ if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || !fs.moveToTrash(deleteFile))
+ fs.deleteRecursively(deleteFile);
+ }
+ }
+
+ public static void replaceFiles(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, FileRef tmpDatafile,
+ FileRef newDatafile) throws IOException {
+ String compactName = newDatafile.path().getName();
+
+ prepareReplacement(fs, location, oldDatafiles, compactName);
+ renameReplacement(fs, tmpDatafile, newDatafile);
+ finishReplacement(acuTableConf, fs, location, oldDatafiles, compactName);
+ }
+
+ public static Collection<String> cleanupReplacement(VolumeManager fs, FileStatus[] files, boolean deleteTmp) throws IOException {
+ /*
+ * called in constructor and before major compactions
+ */
+ Collection<String> goodFiles = new ArrayList<String>(files.length);
+
+ for (FileStatus file : files) {
+
+ String path = file.getPath().toString();
+ if (file.getPath().toUri().getScheme() == null) {
+ // depending on the behavior of HDFS, if list status does not return fully qualified volumes then could switch to the default volume
+ throw new IllegalArgumentException("Require fully qualified paths " + file.getPath());
+ }
+
+ String filename = file.getPath().getName();
+
+ // check for incomplete major compaction, this should only occur
+ // for root tablet
+ if (filename.startsWith("delete+")) {
+ String expectedCompactedFile = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1];
+ if (fs.exists(new Path(expectedCompactedFile))) {
+ // compaction finished, but did not finish deleting compacted files.. so delete it
+ if (!fs.deleteRecursively(file.getPath()))
+ log.warn("Delete of file: " + file.getPath().toString() + " return false");
+ continue;
+ }
+ // compaction did not finish, so put files back
+
+ // reset path and filename for rest of loop
+ filename = filename.split("\\+", 3)[2];
+ path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename;
+
+ DatafileManager.rename(fs, file.getPath(), new Path(path));
+ }
+
+ if (filename.endsWith("_tmp")) {
+ if (deleteTmp) {
+ log.warn("cleaning up old tmp file: " + path);
+ if (!fs.deleteRecursively(file.getPath()))
+ log.warn("Delete of tmp file: " + file.getPath().toString() + " return false");
+
+ }
+ continue;
+ }
+
+ if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
+ log.error("unknown file in tablet" + path);
+ continue;
+ }
+
+ goodFiles.add(path);
+ }
+
+ return goodFiles;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java
new file mode 100644
index 0000000..0ea76d3
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.List;
+
+public class ScanBatch {
+ public final boolean more;
+ public final List<KVEntry> results;
+
+ ScanBatch(List<KVEntry> results, boolean more) {
+ this.results = results;
+ this.more = more;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
new file mode 100644
index 0000000..980a082
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.StatsIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.accumulo.tserver.TabletIteratorEnvironment;
+import org.apache.accumulo.tserver.TabletServer;
+
+class ScanDataSource implements DataSource {
+
+ // data source state
+ private final Tablet tablet;
+ private ScanFileManager fileManager;
+ private SortedKeyValueIterator<Key,Value> iter;
+ private long expectedDeletionCount;
+ private List<MemoryIterator> memIters = null;
+ private long fileReservationId;
+ private AtomicBoolean interruptFlag;
+ private StatsIterator statsIterator;
+
+ private final ScanOptions options;
+
+ ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+ AtomicBoolean interruptFlag) {
+ this.tablet = tablet;
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+ this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
+ this.interruptFlag = interruptFlag;
+ }
+
+ ScanDataSource(Tablet tablet, ScanOptions options) {
+ this.tablet = tablet;
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+ this.options = options;
+ this.interruptFlag = options.interruptFlag;
+ }
+
+ @Override
+ public DataSource getNewDataSource() {
+ if (!isCurrent()) {
+ // log.debug("Switching data sources during a scan");
+ if (memIters != null) {
+ tablet.getTabletMemory().returnIterators(memIters);
+ memIters = null;
+ tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+ fileReservationId = -1;
+ }
+
+ if (fileManager != null)
+ fileManager.releaseOpenFiles(false);
+
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+ iter = null;
+
+ return this;
+ } else
+ return this;
+ }
+
+ @Override
+ public boolean isCurrent() {
+ return expectedDeletionCount == tablet.getDataSourceDeletions();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+ if (iter == null)
+ iter = createIterator();
+ return iter;
+ }
+
+ private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
+
+ Map<FileRef,DataFileValue> files;
+
+ synchronized (tablet) {
+
+ if (memIters != null)
+ throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory");
+
+ if (tablet.isClosed())
+ throw new TabletClosedException();
+
+ if (interruptFlag.get())
+ throw new IterationInterruptedException(tablet.getExtent().toString() + " " + interruptFlag.hashCode());
+
+ // only acquire the file manager when we know the tablet is open
+ if (fileManager == null) {
+ fileManager = tablet.getTabletResources().newScanFileManager();
+ tablet.addActiveScans(this);
+ }
+
+ if (fileManager.getNumOpenFiles() != 0)
+ throw new IllegalStateException("Tried to create new scan iterator w/o releasing files");
+
+ // set this before trying to get iterators in case
+ // getIterators() throws an exception
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+
+ memIters = tablet.getTabletMemory().getIterators();
+ Pair<Long,Map<FileRef,DataFileValue>> reservation = tablet.getDatafileManager().reserveFilesForScan();
+ fileReservationId = reservation.getFirst();
+ files = reservation.getSecond();
+ }
+
+ Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isolated);
+
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
+
+ iters.addAll(mapfiles);
+ iters.addAll(memIters);
+
+ for (SortedKeyValueIterator<Key,Value> skvi : iters)
+ ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
+
+ MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent());
+
+ TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, tablet.getTableConfiguration(), fileManager, files);
+
+ statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter());
+
+ DeletingIterator delIter = new DeletingIterator(statsIterator, false);
+
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+ ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet);
+
+ VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels);
+
+ return iterEnv.getTopLevelIterator(IteratorUtil
+ .loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(), options.ssiList, options.ssio, iterEnv));
+ }
+
+ void close(boolean sawErrors) {
+
+ if (memIters != null) {
+ tablet.getTabletMemory().returnIterators(memIters);
+ memIters = null;
+ tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+ fileReservationId = -1;
+ }
+
+ synchronized (tablet) {
+ if (tablet.removeScan(this) == 0)
+ tablet.notifyAll();
+ }
+
+ if (fileManager != null) {
+ fileManager.releaseOpenFiles(sawErrors);
+ fileManager = null;
+ }
+
+ if (statsIterator != null) {
+ statsIterator.report();
+ }
+
+ }
+
+ public void interrupt() {
+ interruptFlag.set(true);
+ }
+
+ @Override
+ public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reattachFileManager() throws IOException {
+ if (fileManager != null)
+ fileManager.reattach();
+ }
+
+ public void detachFileManager() {
+ if (fileManager != null)
+ fileManager.detach();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
new file mode 100644
index 0000000..9382ea7
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.security.Authorizations;
+
+class ScanOptions {
+
+ final Authorizations authorizations;
+ final byte[] defaultLabels;
+ final Set<Column> columnSet;
+ final List<IterInfo> ssiList;
+ final Map<String,Map<String,String>> ssio;
+ final AtomicBoolean interruptFlag;
+ final int num;
+ final boolean isolated;
+
+ ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
+ Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
+ this.num = num;
+ this.authorizations = authorizations;
+ this.defaultLabels = defaultLabels;
+ this.columnSet = columnSet;
+ this.ssiList = ssiList;
+ this.ssio = ssio;
+ this.interruptFlag = interruptFlag;
+ this.isolated = isolated;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
new file mode 100644
index 0000000..96379fc
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.log4j.Logger;
+
+public class Scanner {
+ private static final Logger log = Logger.getLogger(Scanner.class);
+
+ private final Tablet tablet;
+ private final ScanOptions options;
+ private Range range;
+ private SortedKeyValueIterator<Key,Value> isolatedIter;
+ private ScanDataSource isolatedDataSource;
+ private boolean sawException = false;
+ private boolean scanClosed = false;
+
+ Scanner(Tablet tablet, Range range, ScanOptions options) {
+ this.tablet = tablet;
+ this.range = range;
+ this.options = options;
+ }
+
+ public synchronized ScanBatch read() throws IOException, TabletClosedException {
+
+ if (sawException)
+ throw new IllegalStateException("Tried to use scanner after exception occurred.");
+
+ if (scanClosed)
+ throw new IllegalStateException("Tried to use scanner after it was closed.");
+
+ Batch results = null;
+
+ ScanDataSource dataSource;
+
+ if (options.isolated) {
+ if (isolatedDataSource == null)
+ isolatedDataSource = new ScanDataSource(tablet, options);
+ dataSource = isolatedDataSource;
+ } else {
+ dataSource = new ScanDataSource(tablet, options);
+ }
+
+ try {
+
+ SortedKeyValueIterator<Key,Value> iter;
+
+ if (options.isolated) {
+ if (isolatedIter == null)
+ isolatedIter = new SourceSwitchingIterator(dataSource, true);
+ else
+ isolatedDataSource.reattachFileManager();
+ iter = isolatedIter;
+ } else {
+ iter = new SourceSwitchingIterator(dataSource, false);
+ }
+
+ results = tablet.nextBatch(iter, range, options.num, options.columnSet);
+
+ if (results.results == null) {
+ range = null;
+ return new ScanBatch(new ArrayList<KVEntry>(), false);
+ } else if (results.continueKey == null) {
+ return new ScanBatch(results.results, false);
+ } else {
+ range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive());
+ return new ScanBatch(results.results, true);
+ }
+
+ } catch (IterationInterruptedException iie) {
+ sawException = true;
+ if (tablet.isClosed())
+ throw new TabletClosedException(iie);
+ else
+ throw iie;
+ } catch (IOException ioe) {
+ if (tablet.shutdownInProgress()) {
+ log.debug("IOException while shutdown in progress ", ioe);
+ throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook
+ }
+
+ sawException = true;
+ dataSource.close(true);
+ throw ioe;
+ } catch (RuntimeException re) {
+ sawException = true;
+ throw re;
+ } finally {
+ // code in finally block because always want
+ // to return mapfiles, even when exception is thrown
+ if (!options.isolated)
+ dataSource.close(false);
+ else
+ dataSource.detachFileManager();
+
+ if (results != null && results.results != null)
+ tablet.updateQueryStats(results.results.size(), results.numBytes);
+ }
+ }
+
+ // close and read are synchronized because can not call close on the data source while it is in use
+ // this could lead to the case where file iterators that are in use by a thread are returned
+ // to the pool... this would be bad
+ public void close() {
+ options.interruptFlag.set(true);
+ synchronized (this) {
+ scanClosed = true;
+ if (isolatedDataSource != null)
+ isolatedDataSource.close(false);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
new file mode 100644
index 0000000..084503a
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.master.state.TServerInstance;
+
+/**
+ * operations are disallowed while we split which is ok since splitting is fast
+ *
+ * a minor compaction should have taken place before calling this so there should be relatively little left to compact
+ *
+ * we just need to make sure major compactions aren't occurring if we have the major compactor thread decide who needs splitting we can avoid synchronization
+ * issues with major compactions
+ *
+ */
+
+public class SplitInfo {
+ final String dir;
+ final SortedMap<FileRef,DataFileValue> datafiles;
+ final String time;
+ final long initFlushID;
+ final long initCompactID;
+ final TServerInstance lastLocation;
+
+ SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID, long initCompactID, TServerInstance lastLocation) {
+ this.dir = d;
+ this.datafiles = dfv;
+ this.time = time;
+ this.initFlushID = initFlushID;
+ this.initCompactID = initCompactID;
+ this.lastLocation = lastLocation;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java
new file mode 100644
index 0000000..75cf91e
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import org.apache.hadoop.io.Text;
+
+class SplitRowSpec {
+ final double splitRatio;
+ final Text row;
+
+ SplitRowSpec(double splitRatio, Text row) {
+ this.splitRatio = splitRatio;
+ this.row = row;
+ }
+}
\ No newline at end of file