You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/05 06:42:57 UTC
[15/35] git commit: ACCUMULO-2041 extract tablet classes to new files,
move tablet-related code to o.a.a.tserver.tablet,
make member variables private
ACCUMULO-2041 extract tablet classes to new files, move tablet-related code to o.a.a.tserver.tablet, make member variables private
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7db2abf1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7db2abf1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7db2abf1
Branch: refs/heads/ACCUMULO-378
Commit: 7db2abf19c2e0585b2f3ea32068c3d62bd891590
Parents: 9e770ca
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Apr 21 13:12:13 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue Jun 3 10:49:43 2014 -0400
----------------------------------------------------------------------
.../accumulo/tserver/CompactionStats.java | 59 -
.../accumulo/tserver/CompactionWatcher.java | 110 -
.../org/apache/accumulo/tserver/Compactor.java | 548 ---
.../apache/accumulo/tserver/FileManager.java | 12 +-
.../apache/accumulo/tserver/InMemoryMap.java | 2 +-
.../accumulo/tserver/MinorCompactionReason.java | 21 +
.../apache/accumulo/tserver/MinorCompactor.java | 146 -
.../java/org/apache/accumulo/tserver/Rate.java | 60 -
.../org/apache/accumulo/tserver/RootFiles.java | 133 -
.../tserver/TConstraintViolationException.java | 54 +
.../org/apache/accumulo/tserver/Tablet.java | 3810 ------------------
.../tserver/TabletIteratorEnvironment.java | 8 +-
.../apache/accumulo/tserver/TabletServer.java | 79 +-
.../tserver/TabletServerResourceManager.java | 67 +-
.../accumulo/tserver/TabletStatsKeeper.java | 5 +
.../tserver/log/TabletServerLogger.java | 4 +-
.../apache/accumulo/tserver/tablet/Batch.java | 35 +
.../accumulo/tserver/tablet/CommitSession.java | 121 +
.../accumulo/tserver/tablet/CompactionInfo.java | 113 +
.../tserver/tablet/CompactionRunner.java | 76 +
.../tserver/tablet/CompactionStats.java | 59 +
.../tserver/tablet/CompactionWatcher.java | 109 +
.../accumulo/tserver/tablet/Compactor.java | 477 +++
.../tserver/tablet/DatafileManager.java | 581 +++
.../apache/accumulo/tserver/tablet/KVEntry.java | 39 +
.../tserver/tablet/MinorCompactionTask.java | 96 +
.../accumulo/tserver/tablet/MinorCompactor.java | 145 +
.../apache/accumulo/tserver/tablet/Rate.java | 60 +
.../accumulo/tserver/tablet/RootFiles.java | 133 +
.../accumulo/tserver/tablet/ScanBatch.java | 29 +
.../accumulo/tserver/tablet/ScanDataSource.java | 222 +
.../accumulo/tserver/tablet/ScanOptions.java | 51 +
.../apache/accumulo/tserver/tablet/Scanner.java | 135 +
.../accumulo/tserver/tablet/SplitInfo.java | 52 +
.../accumulo/tserver/tablet/SplitRowSpec.java | 29 +
.../apache/accumulo/tserver/tablet/Tablet.java | 2564 ++++++++++++
.../tserver/tablet/TabletClosedException.java | 29 +
.../tserver/tablet/TabletCommitter.java | 48 +
.../accumulo/tserver/tablet/TabletMemory.java | 190 +
.../accumulo/tserver/CountingIteratorTest.java | 2 +-
.../apache/accumulo/tserver/RootFilesTest.java | 1 +
41 files changed, 5561 insertions(+), 4953 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java
deleted file mode 100644
index d359e95..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-public class CompactionStats {
- private long entriesRead;
- private long entriesWritten;
- private long fileSize;
-
- CompactionStats(long er, long ew) {
- this.setEntriesRead(er);
- this.setEntriesWritten(ew);
- }
-
- public CompactionStats() {}
-
- private void setEntriesRead(long entriesRead) {
- this.entriesRead = entriesRead;
- }
-
- public long getEntriesRead() {
- return entriesRead;
- }
-
- private void setEntriesWritten(long entriesWritten) {
- this.entriesWritten = entriesWritten;
- }
-
- public long getEntriesWritten() {
- return entriesWritten;
- }
-
- public void add(CompactionStats mcs) {
- this.entriesRead += mcs.entriesRead;
- this.entriesWritten += mcs.entriesWritten;
- }
-
- public void setFileSize(long fileSize) {
- this.fileSize = fileSize;
- }
-
- public long getFileSize() {
- return this.fileSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
deleted file mode 100644
index 2e4d7b7..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.tserver.Compactor.CompactionInfo;
-import org.apache.log4j.Logger;
-
-/**
- *
- */
-public class CompactionWatcher implements Runnable {
- private Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>();
- private AccumuloConfiguration config;
- private static boolean watching = false;
-
- private static class ObservedCompactionInfo {
- CompactionInfo compactionInfo;
- long firstSeen;
- boolean loggedWarning;
-
- ObservedCompactionInfo(CompactionInfo ci, long time) {
- this.compactionInfo = ci;
- this.firstSeen = time;
- }
- }
-
- public CompactionWatcher(AccumuloConfiguration config) {
- this.config = config;
- }
-
- public void run() {
- List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions();
-
- Set<List<Long>> newKeys = new HashSet<List<Long>>();
-
- long time = System.currentTimeMillis();
-
- for (CompactionInfo ci : runningCompactions) {
- List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten());
- newKeys.add(compactionKey);
-
- if (!observedCompactions.containsKey(compactionKey)) {
- observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time));
- }
- }
-
- // look for compactions that finished or made progress and logged a warning
- HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions);
- copy.keySet().removeAll(newKeys);
-
- for (ObservedCompactionInfo oci : copy.values()) {
- if (oci.loggedWarning) {
- Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent() + " is no longer stuck");
- }
- }
-
- // remove any compaction that completed or made progress
- observedCompactions.keySet().retainAll(newKeys);
-
- long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
-
- // check for stuck compactions
- for (ObservedCompactionInfo oci : observedCompactions.values()) {
- if (time - oci.firstSeen > warnTime && !oci.loggedWarning) {
- Thread compactionThread = oci.compactionInfo.getThread();
- if (compactionThread != null) {
- StackTraceElement[] trace = compactionThread.getStackTrace();
- Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent());
- e.setStackTrace(trace);
- Logger.getLogger(CompactionWatcher.class).warn(
- "Compaction of " + oci.compactionInfo.getExtent() + " to " + oci.compactionInfo.getOutputFile() + " has not made progress for at least "
- + (time - oci.firstSeen) + "ms", e);
- oci.loggedWarning = true;
- }
- }
- }
- }
-
- public static synchronized void startWatching(AccumuloConfiguration config) {
- if (!watching) {
- SimpleTimer.getInstance(config).schedule(new CompactionWatcher(config), 10000, 10000);
- watching = true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
deleted file mode 100644
index 822171c..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
+++ /dev/null
@@ -1,548 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.thrift.IterInfo;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.WrappingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
-import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReportingIterator;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-
-public class Compactor implements Callable<CompactionStats> {
-
- public static class CountingIterator extends WrappingIterator {
-
- private long count;
- private ArrayList<CountingIterator> deepCopies;
- private AtomicLong entriesRead;
-
- @Override
- public CountingIterator deepCopy(IteratorEnvironment env) {
- return new CountingIterator(this, env);
- }
-
- private CountingIterator(CountingIterator other, IteratorEnvironment env) {
- setSource(other.getSource().deepCopy(env));
- count = 0;
- this.deepCopies = other.deepCopies;
- this.entriesRead = other.entriesRead;
- deepCopies.add(this);
- }
-
- public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) {
- deepCopies = new ArrayList<Compactor.CountingIterator>();
- this.setSource(source);
- count = 0;
- this.entriesRead = entriesRead;
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void next() throws IOException {
- super.next();
- count++;
- if (count % 1024 == 0) {
- entriesRead.addAndGet(1024);
- }
- }
-
- public long getCount() {
- long sum = 0;
- for (CountingIterator dc : deepCopies) {
- sum += dc.count;
- }
-
- return count + sum;
- }
- }
-
- private static final Logger log = Logger.getLogger(Compactor.class);
-
- static class CompactionCanceledException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
- interface CompactionEnv {
- boolean isCompactionEnabled();
-
- IteratorScope getIteratorScope();
- }
-
- private Map<FileRef,DataFileValue> filesToCompact;
- private InMemoryMap imm;
- private FileRef outputFile;
- private boolean propogateDeletes;
- private AccumuloConfiguration acuTableConf;
- private CompactionEnv env;
- private Configuration conf;
- private VolumeManager fs;
- protected KeyExtent extent;
- private List<IteratorSetting> iterators;
-
- // things to report
- private String currentLocalityGroup = "";
- private long startTime;
-
- private MajorCompactionReason reason;
- protected MinorCompactionReason mincReason;
-
- private AtomicLong entriesRead = new AtomicLong(0);
- private AtomicLong entriesWritten = new AtomicLong(0);
- private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
-
- private static AtomicLong nextCompactorID = new AtomicLong(0);
-
- // a unique id to identify a compactor
- private long compactorID = nextCompactorID.getAndIncrement();
-
- protected volatile Thread thread;
-
- private synchronized void setLocalityGroup(String name) {
- this.currentLocalityGroup = name;
- }
-
- private void clearStats() {
- entriesRead.set(0);
- entriesWritten.set(0);
- }
-
- protected static final Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
-
- public static class CompactionInfo {
-
- private Compactor compactor;
- private String localityGroup;
- private long entriesRead;
- private long entriesWritten;
-
- CompactionInfo(Compactor compactor) {
- this.localityGroup = compactor.currentLocalityGroup;
- this.entriesRead = compactor.entriesRead.get();
- this.entriesWritten = compactor.entriesWritten.get();
- this.compactor = compactor;
- }
-
- public long getID() {
- return compactor.compactorID;
- }
-
- public KeyExtent getExtent() {
- return compactor.getExtent();
- }
-
- public long getEntriesRead() {
- return entriesRead;
- }
-
- public long getEntriesWritten() {
- return entriesWritten;
- }
-
- public Thread getThread() {
- return compactor.thread;
- }
-
- public String getOutputFile() {
- return compactor.getOutputFile();
- }
-
- public ActiveCompaction toThrift() {
-
- CompactionType type;
-
- if (compactor.imm != null)
- if (compactor.filesToCompact.size() > 0)
- type = CompactionType.MERGE;
- else
- type = CompactionType.MINOR;
- else if (!compactor.propogateDeletes)
- type = CompactionType.FULL;
- else
- type = CompactionType.MAJOR;
-
- CompactionReason reason;
-
- if (compactor.imm != null)
- switch (compactor.mincReason) {
- case USER:
- reason = CompactionReason.USER;
- break;
- case CLOSE:
- reason = CompactionReason.CLOSE;
- break;
- case SYSTEM:
- default:
- reason = CompactionReason.SYSTEM;
- break;
- }
- else
- switch (compactor.reason) {
- case USER:
- reason = CompactionReason.USER;
- break;
- case CHOP:
- reason = CompactionReason.CHOP;
- break;
- case IDLE:
- reason = CompactionReason.IDLE;
- break;
- case NORMAL:
- default:
- reason = CompactionReason.SYSTEM;
- break;
- }
-
- List<IterInfo> iiList = new ArrayList<IterInfo>();
- Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
-
- for (IteratorSetting iterSetting : compactor.iterators) {
- iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
- iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
- }
- List<String> filesToCompact = new ArrayList<String>();
- for (FileRef ref : compactor.filesToCompact.keySet())
- filesToCompact.add(ref.toString());
- return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact,
- compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
- }
- }
-
- public static List<CompactionInfo> getRunningCompactions() {
- ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>();
-
- synchronized (runningCompactions) {
- for (Compactor compactor : runningCompactions) {
- compactions.add(new CompactionInfo(compactor));
- }
- }
-
- return compactions;
- }
-
- Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
- AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
- this.extent = extent;
- this.conf = conf;
- this.fs = fs;
- this.filesToCompact = files;
- this.imm = imm;
- this.outputFile = outputFile;
- this.propogateDeletes = propogateDeletes;
- this.acuTableConf = acuTableConf;
- this.env = env;
- this.iterators = iterators;
- this.reason = reason;
-
- startTime = System.currentTimeMillis();
- }
-
- Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
- AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
- this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
- }
-
- public VolumeManager getFileSystem() {
- return fs;
- }
-
- KeyExtent getExtent() {
- return extent;
- }
-
- String getOutputFile() {
- return outputFile.toString();
- }
-
- @Override
- public CompactionStats call() throws IOException, CompactionCanceledException {
-
- FileSKVWriter mfw = null;
-
- CompactionStats majCStats = new CompactionStats();
-
- boolean remove = runningCompactions.add(this);
-
- clearStats();
-
- String oldThreadName = Thread.currentThread().getName();
- String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
- Thread.currentThread().setName(newThreadName);
- thread = Thread.currentThread();
- try {
- FileOperations fileFactory = FileOperations.getInstance();
- FileSystem ns = this.fs.getVolumeByPath(outputFile.path()).getFileSystem();
- mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
-
- Map<String,Set<ByteSequence>> lGroups;
- try {
- lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
- } catch (LocalityGroupConfigurationError e) {
- throw new IOException(e);
- }
-
- long t1 = System.currentTimeMillis();
-
- HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
-
- if (mfw.supportsLocalityGroups()) {
- for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
- setLocalityGroup(entry.getKey());
- compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
- allColumnFamilies.addAll(entry.getValue());
- }
- }
-
- setLocalityGroup("");
- compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
-
- long t2 = System.currentTimeMillis();
-
- FileSKVWriter mfwTmp = mfw;
- mfw = null; // set this to null so we do not try to close it again in finally if the close fails
- mfwTmp.close(); // if the close fails it will cause the compaction to fail
-
- // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
- try {
- FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
- openReader.close();
- } catch (IOException ex) {
- log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
- throw ex;
- }
-
- log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
- majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
-
- majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
- return majCStats;
- } catch (IOException e) {
- log.error(e, e);
- throw e;
- } catch (RuntimeException e) {
- log.error(e, e);
- throw e;
- } finally {
- Thread.currentThread().setName(oldThreadName);
- if (remove) {
- thread = null;
- runningCompactions.remove(this);
- }
-
- try {
- if (mfw != null) {
- // compaction must not have finished successfully, so close its output file
- try {
- mfw.close();
- } finally {
- if (!fs.deleteRecursively(outputFile.path()))
- if (fs.exists(outputFile.path()))
- log.error("Unable to delete " + outputFile);
- }
- }
- } catch (IOException e) {
- log.warn(e, e);
- } catch (RuntimeException exception) {
- log.warn(exception, exception);
- }
- }
- }
-
- private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
-
- List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
-
- for (FileRef mapFile : filesToCompact.keySet()) {
- try {
-
- FileOperations fileFactory = FileOperations.getInstance();
- FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
- FileSKVIterator reader;
-
- reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
-
- readers.add(reader);
-
- SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
-
- if (filesToCompact.get(mapFile).isTimeSet()) {
- iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
- }
-
- iters.add(iter);
-
- } catch (Throwable e) {
-
- ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
-
- log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
- // failed to open some map file... close the ones that were opened
- for (FileSKVIterator reader : readers) {
- try {
- reader.close();
- } catch (Throwable e2) {
- log.warn("Failed to close map file", e2);
- }
- }
-
- readers.clear();
-
- if (e instanceof IOException)
- throw (IOException) e;
- throw new IOException("Failed to open map data files", e);
- }
- }
-
- return iters;
- }
-
- private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
- throws IOException, CompactionCanceledException {
- ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
- Span span = Trace.start("compact");
- try {
- long entriesCompacted = 0;
- List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
-
- if (imm != null) {
- iters.add(imm.compactionIterator());
- }
-
- CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead);
- DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
- ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-
- // if(env.getIteratorScope() )
-
- TabletIteratorEnvironment iterEnv;
- if (env.getIteratorScope() == IteratorScope.majc)
- iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
- else if (env.getIteratorScope() == IteratorScope.minc)
- iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
- else
- throw new IllegalArgumentException();
-
- SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
- iterators, iterEnv));
-
- itr.seek(extent.toDataRange(), columnFamilies, inclusive);
-
- if (!inclusive) {
- mfw.startDefaultLocalityGroup();
- } else {
- mfw.startNewLocalityGroup(lgName, columnFamilies);
- }
-
- Span write = Trace.start("write");
- try {
- while (itr.hasTop() && env.isCompactionEnabled()) {
- mfw.append(itr.getTopKey(), itr.getTopValue());
- itr.next();
- entriesCompacted++;
-
- if (entriesCompacted % 1024 == 0) {
- // Periodically update stats, do not want to do this too often since its volatile
- entriesWritten.addAndGet(1024);
- }
- }
-
- if (itr.hasTop() && !env.isCompactionEnabled()) {
- // cancel major compaction operation
- try {
- try {
- mfw.close();
- } catch (IOException e) {
- log.error(e, e);
- }
- fs.deleteRecursively(outputFile.path());
- } catch (Exception e) {
- log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
- }
- throw new CompactionCanceledException();
- }
-
- } finally {
- CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
- majCStats.add(lgMajcStats);
- write.stop();
- }
-
- } finally {
- // close sequence files opened
- for (FileSKVIterator reader : readers) {
- try {
- reader.close();
- } catch (Throwable e) {
- log.warn("Failed to close map file", e);
- }
- }
- span.stop();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index e8958b1..017398e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -161,7 +161,7 @@ public class FileManager {
* @param indexCache
* : underlying file can and should be able to handle a null cache
*/
- FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
+ public FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
if (maxOpen <= 0)
throw new IllegalArgumentException("maxOpen <= 0");
@@ -481,7 +481,7 @@ public class FileManager {
return newlyReservedReaders;
}
- synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
+ public synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
@@ -509,7 +509,7 @@ public class FileManager {
return iters;
}
- synchronized void detach() {
+ public synchronized void detach() {
releaseReaders(tabletReservedReaders, false);
tabletReservedReaders.clear();
@@ -518,7 +518,7 @@ public class FileManager {
fds.unsetIterator();
}
- synchronized void reattach() throws IOException {
+ public synchronized void reattach() throws IOException {
if (tabletReservedReaders.size() != 0)
throw new IllegalStateException();
@@ -545,13 +545,13 @@ public class FileManager {
}
}
- synchronized void releaseOpenFiles(boolean sawIOException) {
+ public synchronized void releaseOpenFiles(boolean sawIOException) {
releaseReaders(tabletReservedReaders, sawIOException);
tabletReservedReaders.clear();
dataSources.clear();
}
- synchronized int getNumOpenFiles() {
+ public synchronized int getNumOpenFiles() {
return tabletReservedReaders.size();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index dc36718..3c9c32c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -567,7 +567,7 @@ public class InMemoryMap {
}
- class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
+ public class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
private AtomicBoolean closed;
private SourceSwitchingIterator ssi;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java
new file mode 100644
index 0000000..25cfd9b
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+public enum MinorCompactionReason {
+ USER, SYSTEM, CLOSE
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java
deleted file mode 100644
index b2e84e5..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-public class MinorCompactor extends Compactor {
-
- private static final Logger log = Logger.getLogger(MinorCompactor.class);
-
- private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
-
- private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) {
- if (mergeFile == null)
- return EMPTY_MAP;
-
- return Collections.singletonMap(mergeFile, dfv);
- }
-
- MinorCompactor(Configuration conf, VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf,
- KeyExtent extent, MinorCompactionReason mincReason) {
- super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() {
-
- @Override
- public boolean isCompactionEnabled() {
- return true;
- }
-
- @Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.minc;
- }
- });
-
- super.mincReason = mincReason;
- }
-
- private boolean isTableDeleting() {
- try {
- return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING;
- } catch (Exception e) {
- log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e);
- return false; // can not get positive confirmation that its deleting.
- }
- }
-
- @Override
- public CompactionStats call() {
- log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
-
- // output to new MapFile with a temporary name
- int sleepTime = 100;
- double growthFactor = 4;
- int maxSleepTime = 1000 * 60 * 3; // 3 minutes
- boolean reportedProblem = false;
-
- runningCompactions.add(this);
- try {
- do {
- try {
- CompactionStats ret = super.call();
-
- // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted,
- // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes()));
-
- if (reportedProblem) {
- ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile());
- }
-
- return ret;
- } catch (IOException e) {
- log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...");
- ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
- reportedProblem = true;
- } catch (RuntimeException e) {
- // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the
- // minor compaction would succeed
- log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e);
- ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
- reportedProblem = true;
- } catch (CompactionCanceledException e) {
- throw new IllegalStateException(e);
- }
-
- Random random = new Random();
-
- int sleep = sleepTime + random.nextInt(sleepTime);
- log.debug("MinC failed sleeping " + sleep + " ms before retrying");
- UtilWaitThread.sleep(sleep);
- sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor));
-
- // clean up
- try {
- if (getFileSystem().exists(new Path(getOutputFile()))) {
- getFileSystem().deleteRecursively(new Path(getOutputFile()));
- }
- } catch (IOException e) {
- log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
- }
-
- if (isTableDeleting())
- return new CompactionStats(0, 0);
-
- } while (true);
- } finally {
- thread = null;
- runningCompactions.remove(this);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java
deleted file mode 100644
index b0ed9ee..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-public class Rate {
- private long lastCounter = -1;
- private long lastTime = -1;
- private double current = 0.0;
- final double ratio;
-
- /**
- * Turn a counter into an exponentially smoothed rate over time.
- *
- * @param ratio
- * the rate at which each update influences the curve; must be (0., 1.0)
- */
- public Rate(double ratio) {
- if (ratio <= 0. || ratio >= 1.0)
- throw new IllegalArgumentException("ratio must be > 0. and < 1.0");
- this.ratio = ratio;
- }
-
- public double update(long counter) {
- return update(System.currentTimeMillis(), counter);
- }
-
- synchronized public double update(long when, long counter) {
- if (lastCounter < 0) {
- lastTime = when;
- lastCounter = counter;
- return current;
- }
- if (lastTime == when) {
- throw new IllegalArgumentException("update time < last value");
- }
- double keep = 1. - ratio;
- current = (keep * current + ratio * ((counter - lastCounter)) * 1000. / (when - lastTime));
- lastTime = when;
- lastCounter = counter;
- return current;
- }
-
- synchronized public double rate() {
- return this.current;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java
deleted file mode 100644
index f23c55d..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- *
- */
-public class RootFiles {
-
- private static Logger log = Logger.getLogger(RootFiles.class);
-
- static void prepareReplacement(VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName) throws IOException {
- for (FileRef ref : oldDatafiles) {
- Path path = ref.path();
- Tablet.rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName()));
- }
- }
-
- static void renameReplacement(VolumeManager fs, FileRef tmpDatafile, FileRef newDatafile) throws IOException {
- if (fs.exists(newDatafile.path())) {
- log.error("Target map file already exist " + newDatafile, new Exception());
- throw new IllegalStateException("Target map file already exist " + newDatafile);
- }
-
- Tablet.rename(fs, tmpDatafile.path(), newDatafile.path());
- }
-
- static void finishReplacement(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName)
- throws IOException {
- // start deleting files, if we do not finish they will be cleaned
- // up later
- for (FileRef ref : oldDatafiles) {
- Path path = ref.path();
- Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName());
- if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || !fs.moveToTrash(deleteFile))
- fs.deleteRecursively(deleteFile);
- }
- }
-
- public static void replaceFiles(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, FileRef tmpDatafile,
- FileRef newDatafile) throws IOException {
- String compactName = newDatafile.path().getName();
-
- prepareReplacement(fs, location, oldDatafiles, compactName);
- renameReplacement(fs, tmpDatafile, newDatafile);
- finishReplacement(acuTableConf, fs, location, oldDatafiles, compactName);
- }
-
- public static Collection<String> cleanupReplacement(VolumeManager fs, FileStatus[] files, boolean deleteTmp) throws IOException {
- /*
- * called in constructor and before major compactions
- */
- Collection<String> goodFiles = new ArrayList<String>(files.length);
-
- for (FileStatus file : files) {
-
- String path = file.getPath().toString();
- if (file.getPath().toUri().getScheme() == null) {
- // depending on the behavior of HDFS, if list status does not return fully qualified volumes then could switch to the default volume
- throw new IllegalArgumentException("Require fully qualified paths " + file.getPath());
- }
-
- String filename = file.getPath().getName();
-
- // check for incomplete major compaction, this should only occur
- // for root tablet
- if (filename.startsWith("delete+")) {
- String expectedCompactedFile = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1];
- if (fs.exists(new Path(expectedCompactedFile))) {
- // compaction finished, but did not finish deleting compacted files.. so delete it
- if (!fs.deleteRecursively(file.getPath()))
- log.warn("Delete of file: " + file.getPath().toString() + " return false");
- continue;
- }
- // compaction did not finish, so put files back
-
- // reset path and filename for rest of loop
- filename = filename.split("\\+", 3)[2];
- path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename;
-
- Tablet.rename(fs, file.getPath(), new Path(path));
- }
-
- if (filename.endsWith("_tmp")) {
- if (deleteTmp) {
- log.warn("cleaning up old tmp file: " + path);
- if (!fs.deleteRecursively(file.getPath()))
- log.warn("Delete of tmp file: " + file.getPath().toString() + " return false");
-
- }
- continue;
- }
-
- if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
- log.error("unknown file in tablet" + path);
- continue;
- }
-
- goodFiles.add(path);
- }
-
- return goodFiles;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java
new file mode 100644
index 0000000..83fc43e
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.List;
+
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+
+public class TConstraintViolationException extends Exception {
+ private static final long serialVersionUID = 1L;
+ private final Violations violations;
+ private final List<Mutation> violators;
+ private final List<Mutation> nonViolators;
+ private final CommitSession commitSession;
+
+ public TConstraintViolationException(Violations violations, List<Mutation> violators, List<Mutation> nonViolators, CommitSession commitSession) {
+ this.violations = violations;
+ this.violators = violators;
+ this.nonViolators = nonViolators;
+ this.commitSession = commitSession;
+ }
+
+ Violations getViolations() {
+ return violations;
+ }
+
+ List<Mutation> getViolators() {
+ return violators;
+ }
+
+ List<Mutation> getNonViolators() {
+ return nonViolators;
+ }
+
+ CommitSession getCommitSession() {
+ return commitSession;
+ }
+}
\ No newline at end of file