You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2011/10/03 12:45:32 UTC
svn commit: r1178374 - in /cassandra/branches/cassandra-1.0.0: ./ contrib/
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/compaction/
Author: xedin
Date: Mon Oct 3 10:45:31 2011
New Revision: 1178374
URL: http://svn.apache.org/viewvc?rev=1178374&view=rev
Log:
merge from 0.8
Modified:
cassandra/branches/cassandra-1.0.0/ (props changed)
cassandra/branches/cassandra-1.0.0/CHANGES.txt
cassandra/branches/cassandra-1.0.0/contrib/ (props changed)
cassandra/branches/cassandra-1.0.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Propchange: cassandra/branches/cassandra-1.0.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 3 10:45:31 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7:1026516-1170333,1172024
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1176603,1176712,1177149,1177781,1177810,1178297,1178325
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1177149,1177781,1177810,1178297,1178325
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1178374&r1=1178373&r2=1178374&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Mon Oct 3 10:45:31 2011
@@ -22,7 +22,6 @@
* use correct ISortedColumns for time-optimized reads (CASSANDRA-3289)
* don't try delivering hints if when there isn't any (CASSANDRA-3176)
-
1.0.0-rc1
* Update CQL to generate microsecond timestamps by default (CASSANDRA-3227)
* Fix counting CFMetadata towards Memtable liveRatio (CASSANDRA-3023)
Propchange: cassandra/branches/cassandra-1.0.0/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 3 10:45:31 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
/cassandra/branches/cassandra-0.7/contrib:1026516-1170333,1172024
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1176603,1176712,1177149,1177781,1177810,1178297,1178325
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1177149,1177781,1177810,1178297,1178325
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
Modified: cassandra/branches/cassandra-1.0.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1178374&r1=1178373&r2=1178374&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-1.0.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Mon Oct 3 10:45:31 2011
@@ -17,11 +17,13 @@
package org.apache.cassandra.hadoop.pig;
import java.io.IOException;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -31,7 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.SuperColumn;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.Deletion;
@@ -46,6 +48,7 @@ import org.apache.pig.*;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.thrift.TDeserializer;
@@ -61,7 +64,7 @@ import org.apache.thrift.transport.TTran
*
* A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
*/
-public class CassandraStorage extends LoadFunc implements StoreFuncInterface
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
// system environment variables that can be set to configure connection info:
// alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
@@ -142,18 +145,14 @@ public class CassandraStorage extends Lo
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ setTupleValue(pair, 0, marshallers.get(0).compose(name));
if (col instanceof Column)
{
// standard
- pair.set(0, marshallers.get(0).compose(name));
if (validators.get(name) == null)
- // Have to special case BytesType because compose returns a ByteBuffer
- if (marshallers.get(1) instanceof BytesType)
- pair.set(1, new DataByteArray(ByteBufferUtil.getArray(col.value())));
- else
- pair.set(1, marshallers.get(1).compose(col.value()));
+ setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
else
- pair.set(1, validators.get(name).compose(col.value()));
+ setTupleValue(pair, 1, validators.get(name).compose(col.value()));
return pair;
}
@@ -166,6 +165,16 @@ public class CassandraStorage extends Lo
return pair;
}
+ private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+ {
+ if (value instanceof BigInteger)
+ pair.set(position, ((BigInteger) value).intValue());
+ else if (value instanceof ByteBuffer)
+ pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+ else
+ pair.set(position, value);
+ }
+
private CfDef getCfDef(String signature)
{
UDFContext context = UDFContext.getUDFContext();
@@ -293,6 +302,103 @@ public class CassandraStorage extends Lo
initSchema(loadSignature);
}
+ public ResourceSchema getSchema(String location, Job job) throws IOException
+ {
+ setLocation(location, job);
+ CfDef cfDef = getCfDef(loadSignature);
+
+ if (cfDef.column_type.equals("Super"))
+ return null;
+ // top-level schema, no type
+ ResourceSchema schema = new ResourceSchema();
+
+ // add key
+ ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
+ keyFieldSchema.setName("key");
+ keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
+
+ // will become the bag of tuples
+ ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
+ bagFieldSchema.setName("columns");
+ bagFieldSchema.setType(DataType.BAG);
+ ResourceSchema bagSchema = new ResourceSchema();
+
+
+ List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>();
+
+ // default comparator/validator
+ ResourceSchema innerTupleSchema = new ResourceSchema();
+ ResourceFieldSchema tupleField = new ResourceFieldSchema();
+ tupleField.setType(DataType.TUPLE);
+ tupleField.setSchema(innerTupleSchema);
+
+ ResourceFieldSchema colSchema = new ResourceFieldSchema();
+ colSchema.setName("name");
+ colSchema.setType(getPigType(marshallers.get(0)));
+ tupleFields.add(colSchema);
+
+ ResourceFieldSchema valSchema = new ResourceFieldSchema();
+ AbstractType validator = marshallers.get(1);
+ valSchema.setName("value");
+ valSchema.setType(getPigType(validator));
+ tupleFields.add(valSchema);
+
+ // defined validators/indexes
+ for (ColumnDef cdef : cfDef.column_metadata)
+ {
+ colSchema = new ResourceFieldSchema();
+ colSchema.setName(new String(cdef.getName()));
+ colSchema.setType(getPigType(marshallers.get(0)));
+ tupleFields.add(colSchema);
+
+ valSchema = new ResourceFieldSchema();
+ validator = validators.get(cdef.getName());
+ if (validator == null)
+ validator = marshallers.get(1);
+ valSchema.setName("value");
+ valSchema.setType(getPigType(validator));
+ tupleFields.add(valSchema);
+ }
+ innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()]));
+
+ // a bag can contain only one tuple, but that tuple can contain anything
+ bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
+ bagFieldSchema.setSchema(bagSchema);
+ // top level schema contains everything
+ schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema });
+ return schema;
+ }
+
+ private byte getPigType(AbstractType type)
+ {
+ if (type instanceof LongType)
+ return DataType.LONG;
+ else if (type instanceof IntegerType)
+ return DataType.INTEGER;
+ else if (type instanceof AsciiType)
+ return DataType.CHARARRAY;
+ else if (type instanceof UTF8Type)
+ return DataType.CHARARRAY;
+ return DataType.BYTEARRAY;
+ }
+
+ public ResourceStatistics getStatistics(String location, Job job)
+ {
+ return null;
+ }
+
+ public String[] getPartitionKeys(String location, Job job)
+ {
+ return null;
+ }
+
+ public void setPartitionFilter(Expression partitionFilter)
+ {
+ // no-op
+ }
+
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException
{
@@ -355,8 +461,6 @@ public class CassandraStorage extends Lo
DefaultDataBag pairs = (DefaultDataBag) t.get(1);
ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
CfDef cfDef = getCfDef(storeSignature);
- List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
try
{
for (Tuple pair : pairs)
@@ -400,15 +504,8 @@ public class CassandraStorage extends Lo
else
{
org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
- column.name = marshallers.get(0).decompose((pair.get(0)));
- if (validators.get(column.name) == null)
- // Have to special case BytesType to convert DataByteArray into ByteBuffer
- if (marshallers.get(1) instanceof BytesType)
- column.value = objToBB(pair.get(1));
- else
- column.value = marshallers.get(1).decompose(pair.get(1));
- else
- column.value = validators.get(column.name).decompose(pair.get(1));
+ column.name = objToBB(pair.get(0));
+ column.value = objToBB(pair.get(1));
column.setTimestamp(System.currentTimeMillis() * 1000);
mutation.column_or_supercolumn = new ColumnOrSuperColumn();
mutation.column_or_supercolumn.column = column;
@@ -528,3 +625,4 @@ public class CassandraStorage extends Lo
return cfDef;
}
}
+
Propchange: cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 3 10:45:31 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1170333,1172024
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1176603,1176712,1177149,1177781,1177810,1178297,1178325
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1177149,1177781,1177810,1178297,1178325
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
Propchange: cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 3 10:45:31 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1170333,1172024
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1176603,1176712,1177149,1177781,1177810,1178297,1178325
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1177149,1177781,1177810,1178297,1178325
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
Propchange: cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 3 10:45:31 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1170333,1172024
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1176603,1176712,1177149,1177781,1177810,1178297,1178325
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1177149,1177781,1177810,1178297,1178325
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
Propchange: cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 3 10:45:31 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1170333,1172024
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1176603,1176712,1177149,1177781,1177810,1178297,1178325
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1177149,1177781,1177810,1178297,1178325
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
Propchange: cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 3 10:45:31 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1170333,1172024
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1176603,1176712,1177149,1177781,1177810,1178297,1178325
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1177149,1177781,1177810,1178297,1178325
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1178374&r1=1178373&r2=1178374&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Mon Oct 3 10:45:31 2011
@@ -486,6 +486,8 @@ public class CompactionManager implement
String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
RandomAccessReader indexFile = RandomAccessReader.open(new File(indexFilename), true);
+ ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
+
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -495,12 +497,19 @@ public class CompactionManager implement
assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
}
- // errors when creating the writer may leave empty temp files.
- SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable));
SSTableReader newSstable = null;
- executor.beginCompaction(new ScrubInfo(dataFile, sstable));
+
+ // errors when creating the writer may leave empty temp files.
+ SSTableWriter writer = maybeCreateWriter(cfs,
+ compactionFileLocation,
+ expectedBloomFilterSize,
+ null,
+ Collections.singletonList(sstable));
+
int goodRows = 0, badRows = 0, emptyRows = 0;
+ executor.beginCompaction(scrubInfo);
+
try
{
while (!dataFile.isEOF())
@@ -649,6 +658,8 @@ public class CompactionManager implement
{
FileUtils.closeQuietly(dataFile);
FileUtils.closeQuietly(indexFile);
+
+ executor.finishCompaction(scrubInfo);
}
}