You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/03/20 00:55:14 UTC
[accumulo] branch 1451-external-compactions-feature updated:
Persist external compactions in metadata table
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new 81fee5c Persist external compactions in metadata table
81fee5c is described below
commit 81fee5c19340c8cd794178365263b661365425b1
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Mar 19 19:58:48 2021 -0400
Persist external compactions in metadata table
---
.../accumulo/core/metadata/schema/Ample.java | 5 +
.../metadata/schema}/ExternalCompactionId.java | 2 +-
.../schema/ExternalCompactionMetadata.java | 121 +++++++++++++++++++++
.../core/metadata/schema/MetadataSchema.java | 5 +
.../core/metadata/schema/TabletMetadata.java | 18 ++-
.../core/metadata/schema/TabletsMetadata.java | 4 +
.../core/spi/compaction/CompactionExecutorId.java | 1 +
.../core/util/compaction/CompactionJobImpl.java | 20 +++-
.../server/constraints/MetadataConstraints.java | 4 +-
.../server/metadata/TabletMutatorBase.java | 17 +++
.../accumulo/server/util/ManagerMetadataUtil.java | 7 +-
.../coordinator/CompactionCoordinator.java | 2 +-
.../accumulo/tserver/ThriftClientHandler.java | 2 +-
.../accumulo/tserver/compactions/Compactable.java | 2 +-
.../tserver/compactions/CompactionManager.java | 6 +-
.../tserver/compactions/ExternalCompactionJob.java | 2 +-
.../accumulo/tserver/tablet/CompactableImpl.java | 105 ++++++++++++------
.../accumulo/tserver/tablet/CompactableUtils.java | 2 +-
.../accumulo/tserver/tablet/DatafileManager.java | 8 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 20 +++-
.../apache/accumulo/tserver/tablet/TabletData.java | 10 ++
21 files changed, 312 insertions(+), 51 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 1860122..334dac2 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -270,6 +270,11 @@ public interface Ample {
TabletMutator deleteSuspension();
+ TabletMutator putExternalCompaction(ExternalCompactionId ecid,
+ ExternalCompactionMetadata ecMeta);
+
+ TabletMutator deleteExternalCompaction(ExternalCompactionId ecid);
+
/**
* This method persist (or queues for persisting) previous put and deletes against this object.
* Unless this method is called, previous calls will never be persisted. The purpose of this
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java
similarity index 97%
rename from server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java
rename to core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java
index e6880c4..c5ae588 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.server.compaction;
+package org.apache.accumulo.core.metadata.schema;
import java.util.UUID;
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java
new file mode 100644
index 0000000..6398bf7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.metadata.schema;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.hadoop.fs.Path;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class ExternalCompactionMetadata {
+
+ private static final Gson GSON = new GsonBuilder().create();
+
+ private final Set<StoredTabletFile> jobFiles;
+ private final TabletFile compactTmpName;
+ private final TabletFile newFile;
+ private final String compactorId;
+ private final CompactionKind kind;
+ private final long priority;
+ private final CompactionExecutorId ceid;
+
+ public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles, TabletFile compactTmpName,
+ TabletFile newFile, String compactorId, CompactionKind kind, long priority,
+ CompactionExecutorId ceid) {
+ this.jobFiles = Objects.requireNonNull(jobFiles);
+ this.compactTmpName = Objects.requireNonNull(compactTmpName);
+ this.newFile = Objects.requireNonNull(newFile);
+ this.compactorId = Objects.requireNonNull(compactorId);
+ this.kind = Objects.requireNonNull(kind);
+ this.priority = priority;
+ this.ceid = Objects.requireNonNull(ceid);
+ }
+
+ public Set<StoredTabletFile> getJobFiles() {
+ return jobFiles;
+ }
+
+ public TabletFile getCompactTmpName() {
+ return compactTmpName;
+ }
+
+ public TabletFile getNewFile() {
+ return newFile;
+ }
+
+ public String getCompactorId() {
+ return compactorId;
+ }
+
+ public CompactionKind getKind() {
+ return kind;
+ }
+
+ public long getPriority() {
+ return priority;
+ }
+
+ public CompactionExecutorId getCompactionExecutorId() {
+ return ceid;
+ }
+
+ // This class is used to serialize and deserialize this class using GSon. Any changes to this
+ // class must consider persisted data.
+ private static class GSonData {
+ List<String> inputs;
+ String tmp;
+ String dest;
+ String compactor;
+ String kind;
+ String executorId;
+ long priority;
+ }
+
+ public String toJson() {
+ GSonData jData = new GSonData();
+ jData.inputs = jobFiles.stream().map(StoredTabletFile::getMetaUpdateDelete).collect(toList());
+ jData.tmp = compactTmpName.getMetaInsert();
+ jData.dest = newFile.getMetaInsert();
+ jData.compactor = compactorId;
+ jData.kind = kind.name();
+ jData.executorId = ceid.getExernalName();
+ jData.priority = priority;
+ return GSON.toJson(jData);
+ }
+
+ public static ExternalCompactionMetadata fromJson(String json) {
+ GSonData jData = GSON.fromJson(json, GSonData.class);
+ return new ExternalCompactionMetadata(
+ jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
+ new TabletFile(new Path(jData.tmp)), new TabletFile(new Path(jData.dest)), jData.compactor,
+ CompactionKind.valueOf(jData.kind), jData.priority,
+ CompactionExecutorId.externalId(jData.executorId));
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 795ea03..02f8470 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -327,6 +327,11 @@ public class MetadataSchema {
public static final Text NAME = new Text("chopped");
public static final ColumnFQ CHOPPED_COLUMN = new ColumnFQ(NAME, new Text("chopped"));
}
+
+ public static class ExternalCompactionColumnFamily {
+ public static final String STR_NAME = "ecomp";
+ public static final Text NAME = new Text(STR_NAME);
+ }
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index ee5283c..e075eff 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.core.metadata.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_QUAL;
@@ -62,11 +61,13 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Bu
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.HostAndPort;
@@ -109,6 +110,7 @@ public class TabletMetadata {
private List<LogEntry> logs;
private OptionalLong compact = OptionalLong.empty();
private Double splitRatio = null;
+ private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions;
public enum LocationType {
CURRENT, FUTURE, LAST
@@ -129,7 +131,8 @@ public class TabletMetadata {
LOGS,
COMPACT_ID,
SPLIT_RATIO,
- SUSPEND
+ SUSPEND,
+ ECOMP
}
public static class Location extends TServerInstance {
@@ -292,6 +295,11 @@ public class TabletMetadata {
}
}
+ public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() {
+ ensureFetched(ColumnType.ECOMP);
+ return extCompactions;
+ }
+
@VisibleForTesting
public static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter,
EnumSet<ColumnType> fetchedColumns, boolean buildKeyValueMap) {
@@ -306,6 +314,7 @@ public class TabletMetadata {
var filesBuilder = ImmutableMap.<StoredTabletFile,DataFileValue>builder();
var scansBuilder = ImmutableList.<StoredTabletFile>builder();
var logsBuilder = ImmutableList.<LogEntry>builder();
+ var extCompBuilder = ImmutableMap.<ExternalCompactionId,ExternalCompactionMetadata>builder();
final var loadedFilesBuilder = ImmutableMap.<TabletFile,Long>builder();
ByteSequence row = null;
@@ -392,6 +401,10 @@ public class TabletMetadata {
case LogColumnFamily.STR_NAME:
logsBuilder.add(LogEntry.fromMetaWalEntry(kv));
break;
+ case ExternalCompactionColumnFamily.STR_NAME:
+ extCompBuilder.put(ExternalCompactionId.of(qual),
+ ExternalCompactionMetadata.fromJson(val));
+ break;
default:
throw new IllegalStateException("Unexpected family " + fam);
}
@@ -402,6 +415,7 @@ public class TabletMetadata {
te.fetchedCols = fetchedColumns;
te.scans = scansBuilder.build();
te.logs = logsBuilder.build();
+ te.extCompactions = extCompBuilder.build();
if (buildKeyValueMap) {
te.keyValues = kvBuilder.build();
}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index f67f81a..1d6fc89 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Bu
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
@@ -198,6 +199,9 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
case TIME:
qualifiers.add(TIME_COLUMN);
break;
+ case ECOMP:
+ families.add(ExternalCompactionColumnFamily.NAME);
+ break;
default:
throw new IllegalArgumentException("Unknown col type " + colToFetch);
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
index 498c8b4..dd07577 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
@@ -36,6 +36,7 @@ public class CompactionExecutorId extends AbstractId<CompactionExecutorId> {
super(canonical);
}
+ // CBUG maybe all of the following methods should not be in SPI
public boolean isExernalId() {
return canonical().startsWith("e.");
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java
index 62ba955..16628e9 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java
@@ -27,6 +27,8 @@ import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import com.google.common.base.Preconditions;
+
/**
* An immutable object that describes what files to compact and where to compact them.
*
@@ -40,14 +42,25 @@ public class CompactionJobImpl implements CompactionJob {
private final Set<CompactableFile> files;
private final CompactionKind kind;
private boolean selectedAll;
+ private boolean hasSelectedAll;
- CompactionJobImpl(long priority, CompactionExecutorId executor, Collection<CompactableFile> files,
- CompactionKind kind, boolean selectedAllFiles) {
+ public CompactionJobImpl(long priority, CompactionExecutorId executor,
+ Collection<CompactableFile> files, CompactionKind kind, boolean selectedAllFiles) {
this.priority = priority;
this.executor = Objects.requireNonNull(executor);
this.files = Set.copyOf(files);
- this.kind = kind;
+ this.kind = Objects.requireNonNull(kind);
this.selectedAll = selectedAllFiles;
+ this.hasSelectedAll = true;
+ }
+
+ public CompactionJobImpl(long priority, CompactionExecutorId executor,
+ Collection<CompactableFile> files, CompactionKind kind) {
+ this.priority = priority;
+ this.executor = Objects.requireNonNull(executor);
+ this.files = Set.copyOf(files);
+ this.kind = Objects.requireNonNull(kind);
+ this.hasSelectedAll = false;
}
@Override
@@ -85,6 +98,7 @@ public class CompactionJobImpl implements CompactionJob {
}
public boolean selectedAll() {
+ Preconditions.checkState(hasSelectedAll);
return selectedAll;
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index c9acb53..d18407d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ch
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
@@ -95,7 +96,8 @@ public class MetadataConstraints implements Constraint {
LastLocationColumnFamily.NAME,
FutureLocationColumnFamily.NAME,
ChoppedColumnFamily.NAME,
- ClonedColumnFamily.NAME));
+ ClonedColumnFamily.NAME,
+ ExternalCompactionColumnFamily.NAME));
// @formatter:on
private static boolean isValidColumn(ColumnUpdate cu) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
index fda24bd..8da4d41 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
@@ -26,11 +26,15 @@ import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
@@ -222,6 +226,19 @@ public abstract class TabletMutatorBase implements Ample.TabletMutator {
return this;
}
+ @Override
+ public TabletMutator putExternalCompaction(ExternalCompactionId ecid,
+ ExternalCompactionMetadata ecMeta) {
+ mutation.put(ExternalCompactionColumnFamily.STR_NAME, ecid.canonical(), ecMeta.toJson());
+ return this;
+ }
+
+ @Override
+ public TabletMutator deleteExternalCompaction(ExternalCompactionId ecid) {
+ mutation.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid.canonical());
+ return this;
+ }
+
protected Mutation getMutation() {
updatesEnabled = false;
return mutation;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
index ab07c06..0623020 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -46,6 +47,7 @@ import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -182,7 +184,7 @@ public class ManagerMetadataUtil {
public static void replaceDatafiles(ServerContext context, KeyExtent extent,
Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles, TabletFile path,
Long compactionId, DataFileValue size, String address, TServerInstance lastLocation,
- ZooLock zooLock) {
+ ZooLock zooLock, Optional<ExternalCompactionId> ecid) {
context.getAmple().putGcCandidates(extent.tableId(), datafilesToDelete);
@@ -204,6 +206,9 @@ public class ManagerMetadataUtil {
if (lastLocation != null && !lastLocation.equals(self))
tablet.deleteLocation(lastLocation, LocationType.LAST);
+ if (ecid.isPresent())
+ tablet.deleteExternalCompaction(ecid.get());
+
tablet.putZooLock(zooLock);
tablet.mutate();
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 4fad8ca..3f39e96 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
@@ -55,7 +56,6 @@ import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
-import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
import org.apache.accumulo.server.compaction.RetryableThriftCall;
import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index d8cebd1..71ec2b6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -91,6 +91,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
@@ -122,7 +123,6 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.compaction.CompactionInfo;
import org.apache.accumulo.server.compaction.Compactor;
-import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.TooManyFilesException;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
index f8cc442..07ac59e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
@@ -32,11 +32,11 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.accumulo.server.compaction.ExternalCompactionId;
/**
* Interface between compaction service and tablet.
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index db4d97d..cfbb1ee 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
@@ -41,7 +42,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.slf4j.Logger;
@@ -428,6 +428,10 @@ public class CompactionManager {
return getExternalExecutor(CompactionExecutorId.externalId(queueName));
}
+ public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent externt) {
+ runningExternalCompactions.put(ecid, externt);
+ }
+
public void commitExternalCompaction(ExternalCompactionId extCompactionId,
Map<KeyExtent,Tablet> currentTablets, long fileSize, long entries) {
KeyExtent extent = runningExternalCompactions.get(extCompactionId);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
index 3c02127..67830d2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
@@ -28,13 +28,13 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
import org.apache.accumulo.core.tabletserver.thrift.InputFile;
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.apache.accumulo.server.compaction.ExternalCompactionId;
public class ExternalCompactionJob {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index b9439a7..21ac1de 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -49,6 +49,8 @@ import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.accumulo.core.spi.compaction.CompactionDispatcher.DispatchParameters;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
@@ -60,7 +62,6 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.compaction.CompactionStats;
import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledException;
-import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.tserver.compactions.Compactable;
import org.apache.accumulo.tserver.compactions.CompactionManager;
@@ -119,6 +120,15 @@ public class CompactableImpl implements Compactable {
private volatile boolean closed = false;
+ // TODO move to top of class
+ private static class ExternalCompactionInfo {
+ ExternalCompactionMetadata meta;
+ CompactionJob job;
+ }
+
+ private Map<ExternalCompactionId,ExternalCompactionInfo> externalCompactions =
+ new ConcurrentHashMap<>();
+
// This interface exists for two purposes. First it allows abstraction of new and old
// implementations for user pluggable file selection code. Second it facilitates placing code
// outside of this class.
@@ -131,9 +141,32 @@ public class CompactableImpl implements Compactable {
}
- public CompactableImpl(Tablet tablet, CompactionManager manager) {
+ public CompactableImpl(Tablet tablet, CompactionManager manager,
+ Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions) {
this.tablet = tablet;
this.manager = manager;
+
+ var dataFileSizes = tablet.getDatafileManager().getDatafileSizes();
+
+ extCompactions.forEach((ecid, ecMeta) -> {
+ // CBUG ensure files for each external compaction are disjoint
+ allCompactingFiles.addAll(ecMeta.getJobFiles());
+ Collection<CompactableFile> files = ecMeta.getJobFiles().stream()
+ .map(f -> new CompactableFileImpl(f, dataFileSizes.get(f))).collect(Collectors.toList());
+ CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(),
+ ecMeta.getCompactionExecutorId(), files, ecMeta.getKind());
+ runnningJobs.add(job);
+
+ ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+ ecInfo.job = job;
+ ecInfo.meta = ecMeta;
+ externalCompactions.put(ecid, ecInfo);
+
+ manager.registerExternalCompaction(ecid, getExtent());
+ });
+
+ compactionRunning = !allCompactingFiles.isEmpty();
+
this.servicesInUse = Suppliers.memoizeWithExpiration(() -> {
HashSet<CompactionServiceId> servicesIds = new HashSet<>();
for (CompactionKind kind : CompactionKind.values()) {
@@ -553,9 +586,6 @@ public class CompactableImpl implements Compactable {
CompactionHelper localHelper;
List<IteratorSetting> iters = List.of();
CompactionConfig localCompactionCfg;
- public TabletFile compactTmpName;
- public CompactionJob job;
- public TabletFile newFile;
}
private CompactionInfo reserveFilesForCompaction(CompactionServiceId service, CompactionJob job) {
@@ -720,9 +750,6 @@ public class CompactableImpl implements Compactable {
}
}
- // TODO move to top of class
- private Map<ExternalCompactionId,CompactionInfo> externalCompactions = new ConcurrentHashMap<>();
-
@Override
public ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service,
CompactionJob job, String compactorId) {
@@ -734,18 +761,25 @@ public class CompactableImpl implements Compactable {
// CBUG add external compaction info to metadata table
try {
// CBUG share code w/ CompactableUtil and/or move there
- cInfo.newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C");
- cInfo.compactTmpName = new TabletFile(new Path(cInfo.newFile.getMetaInsert() + "_tmp"));
+ var newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C");
+ var compactTmpName = new TabletFile(new Path(newFile.getMetaInsert() + "_tmp"));
ExternalCompactionId externalCompactionId = ExternalCompactionId.generate();
- cInfo.job = job;
+ ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+
+ ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles, compactTmpName, newFile,
+ compactorId, job.getKind(), job.getPriority(), job.getExecutor());
+ tablet.getContext().getAmple().mutateTablet(getExtent())
+ .putExternalCompaction(externalCompactionId, ecInfo.meta).mutate();
- externalCompactions.put(externalCompactionId, cInfo);
+ ecInfo.job = job;
+
+ externalCompactions.put(externalCompactionId, ecInfo);
// CBUG because this is an RPC the return may never get to the caller... however the caller
// may be alive.... maybe the caller can set the externalCompactionId it working on in ZK
- return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, cInfo.compactTmpName,
+ return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, compactTmpName,
getExtent(), externalCompactionId, job.getPriority(), job.getKind(), cInfo.iters);
} catch (Exception e) {
@@ -758,25 +792,34 @@ public class CompactableImpl implements Compactable {
public void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize,
long entries) {
// CBUG double check w/ java docs that only one thread can remove
- CompactionInfo cInfo = externalCompactions.remove(extCompactionId);
-
- if (cInfo != null) {
- log.debug("Attempting to commit external compaction {}", extCompactionId);
- // TODO do a sanity check that files exists in dfs?
- StoredTabletFile metaFile = null;
- try {
- metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(cInfo.jobFiles,
- cInfo.compactTmpName, cInfo.newFile, compactionId,
- new DataFileValue(fileSize, entries));
- TabletLogger.compacted(getExtent(), cInfo.job, metaFile);
- } catch (Exception e) {
- metaFile = null;
- log.error("Error committing external compaction {}", extCompactionId, e);
- throw new RuntimeException(e);
- } finally {
- completeCompaction(cInfo.job, cInfo.jobFiles, metaFile);
- log.debug("Completed commit of external compaction{}", extCompactionId);
+ ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);
+
+ if (ecInfo != null) {
+ synchronized (ecInfo) {
+ if (!externalCompactions.containsKey(extCompactionId)) {
+ // since this method is called by RPCs there could be multiple concurrent calls so defend
+ // against that
+ return;
+ }
+ log.debug("Attempting to commit external compaction {}", extCompactionId);
+ // TODO do a sanity check that files exists in dfs?
+ StoredTabletFile metaFile = null;
+ try {
+ metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(
+ ecInfo.meta.getJobFiles(), ecInfo.meta.getCompactTmpName(), ecInfo.meta.getNewFile(),
+ compactionId, new DataFileValue(fileSize, entries), Optional.of(extCompactionId));
+ TabletLogger.compacted(getExtent(), ecInfo.job, metaFile);
+ } catch (Exception e) {
+ metaFile = null;
+ log.error("Error committing external compaction {}", extCompactionId, e);
+ throw new RuntimeException(e);
+ } finally {
+ completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile);
+ externalCompactions.remove(extCompactionId);
+ log.debug("Completed commit of external compaction{}", extCompactionId);
+ }
}
+
} else {
log.debug("Ignoring request to commit external compaction that is unknown {}",
extCompactionId);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index d6da2f8..f5b0b5e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -616,7 +616,7 @@ public class CompactableUtils {
metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(compactFiles.keySet(),
compactTmpName, newFile, compactionId,
- new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()));
+ new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()), Optional.empty());
return metaFile;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 8a00228..56483bc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -40,6 +41,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Pair;
@@ -396,8 +398,8 @@ class DatafileManager {
}
StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
- TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, DataFileValue dfv)
- throws IOException {
+ TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, DataFileValue dfv,
+ Optional<ExternalCompactionId> ecid) throws IOException {
final KeyExtent extent = tablet.getExtent();
VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager();
long t1, t2;
@@ -453,7 +455,7 @@ class DatafileManager {
ManagerMetadataUtil.replaceDatafiles(tablet.getContext(), extent, oldDatafiles,
filesInUseByScans, newFile, compactionId, dfv,
tablet.getTabletServer().getClientAddressString(), lastLocation,
- tablet.getTabletServer().getLock());
+ tablet.getTabletServer().getLock(), ecid);
tablet.setLastCompactionID(compactionId);
removeFilesAfterScan(filesInUseByScans);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index c3cd98d..b50b48e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Durability;
@@ -75,6 +76,8 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
@@ -422,24 +425,35 @@ public class Tablet {
// look for hints of a failure on the previous tablet server
if (!logEntries.isEmpty()) {
// look for any temp files hanging around
- removeOldTemporaryFiles();
+ removeOldTemporaryFiles(data.getExternalCompactions());
}
- this.compactable = new CompactableImpl(this, tabletServer.getCompactionManager());
+ this.compactable = new CompactableImpl(this, tabletServer.getCompactionManager(),
+ data.getExternalCompactions());
}
public ServerContext getContext() {
return context;
}
- private void removeOldTemporaryFiles() {
+ private void removeOldTemporaryFiles(
+ Map<ExternalCompactionId,ExternalCompactionMetadata> externalCompactions) {
// remove any temporary files created by a previous tablet server
try {
+
+ var extCompactionFiles = externalCompactions.values().stream()
+ .map(ecMeta -> ecMeta.getCompactTmpName().getPath()).collect(Collectors.toSet());
+
for (Volume volume : getTabletServer().getVolumeManager().getVolumes()) {
String dirUri = volume.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ extent.tableId() + Path.SEPARATOR + dirName;
for (FileStatus tmp : volume.getFileSystem().globStatus(new Path(dirUri, "*_tmp"))) {
+
+ if (extCompactionFiles.contains(tmp.getPath())) {
+ continue;
+ }
+
try {
log.debug("Removing old temp file {}", tmp.getPath());
volume.getFileSystem().delete(tmp.getPath(), false);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
index 2f7cac3..59d9360 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
@@ -30,6 +30,8 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -48,6 +50,7 @@ public class TabletData {
private Map<Long,List<TabletFile>> bulkImported = new HashMap<>();
private long splitTime = 0;
private String directoryName = null;
+ private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions;
// Read tablet data from metadata tables
public TabletData(TabletMetadata meta) {
@@ -67,6 +70,8 @@ public class TabletData {
meta.getLoaded().forEach((path, txid) -> {
bulkImported.computeIfAbsent(txid, k -> new ArrayList<>()).add(path);
});
+
+ this.extCompactions = meta.getExternalCompactions();
}
// Data pulled from an existing tablet to make a split
@@ -81,6 +86,7 @@ public class TabletData {
this.lastLocation = lastLocation;
this.bulkImported = bulkIngestedFiles;
this.splitTime = System.currentTimeMillis();
+ this.extCompactions = Map.of();
}
public MetadataTime getTime() {
@@ -122,4 +128,8 @@ public class TabletData {
public long getSplitTime() {
return splitTime;
}
+
+ public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() {
+ return extCompactions;
+ }
}