You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/18 18:04:18 UTC
[incubator-pinot] 03/05: temp update for adding bunch of stuff
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 92bba12ca1bf14207e8eec2b46ee1c7cc50946a4
Author: james Shao <sj...@uber.com>
AuthorDate: Mon Mar 16 11:58:46 2020 -0700
temp update for adding bunch of stuff
---
.../core/data/manager/BaseTableDataManager.java | 10 +-
.../core/data/manager/InstanceDataManager.java | 8 +-
.../pinot/core/data/manager/TableDataManager.java | 10 +-
.../offline/ImmutableSegmentDataManager.java | 17 +++
.../manager/offline/OfflineTableDataManager.java | 18 ++-
.../manager/offline/TableDataManagerProvider.java | 26 +++--
.../offline/UpsertImmutableSegmentDataManager.java | 67 -----------
.../AppendLLRealtimeSegmentDataManager.java | 58 ---------
.../realtime/AppendRealtimeTableDataManager.java | 55 ---------
.../realtime/HLRealtimeSegmentDataManager.java | 5 +-
.../realtime/LLRealtimeSegmentDataManager.java | 26 +++--
.../manager/realtime/RealtimeTableDataManager.java | 48 +++++---
.../realtime/UpsertRealtimeTableDataManager.java | 82 -------------
.../manager/upsert/DataManagerCallback.java} | 32 ++---
.../upsert/DefaultDataManagerCallbackImpl.java | 71 +++++++++++
.../upsert/DefaultIndexSegmentCallback.java | 65 +++++++++++
.../DefaultTableDataManagerCallbackImpl.java} | 36 +++---
.../manager/upsert/IndexSegmentCallback.java} | 32 ++---
.../manager/upsert/TableDataManagerCallback.java} | 20 +++-
.../upsert/TableDataManagerCallbackProvider.java | 91 +++++++++++++++
.../immutable/ImmutableSegmentImpl.java | 32 +++--
.../immutable/ImmutableSegmentLoader.java | 26 ++---
.../indexsegment/mutable/MutableSegmentImpl.java | 24 ++--
...entUpdaterConfig.java => WatermarkManager.java} | 16 ++-
.../data/manager/BaseTableDataManagerTest.java | 20 ++--
.../realtime/LLRealtimeSegmentDataManagerTest.java | 5 +-
.../mutable/MutableSegmentImplTestUtils.java | 3 +-
.../SegmentGenerationWithNullValueVectorTest.java | 3 +-
.../core/segment/index/loader/LoaderTest.java | 10 +-
.../pinot/queries/SerializedBytesQueriesTest.java | 2 +
.../pinot/query/executor/QueryExecutorTest.java | 3 +-
pinot-grigio/{ => pinot-grigio-provided}/pom.xml | 49 +++++---
.../upsert/UpsertDataManagerCallbackImpl.java | 106 +++++++++--------
.../UpsertImmutableIndexSegmentCallback.java | 130 ++++++++++-----------
.../upsert/UpsertMutableIndexSegmentCallback.java | 55 +++++----
.../upsert/UpsertTableDataManagerCallbackImpl.java | 63 ++++++++++
.../pinot/core/segment/updater/SegmentUpdater.java | 28 ++---
.../core/segment/updater/SegmentUpdaterConfig.java | 0
.../segment/updater/UpsertWatermarkManager.java | 20 ++--
.../UpsertImmutableIndexSegmentCallbackTest.java | 17 ++-
pinot-grigio/pom.xml | 1 +
.../starter/helix/HelixInstanceDataManager.java | 16 ++-
.../server/starter/helix/HelixServerStarter.java | 2 +
.../SegmentOnlineOfflineStateModelFactory.java | 9 ++
.../upsert/DefaultUpsertComponentContainer.java | 34 ++++--
.../server/upsert/SegmentDeletionHandler.java | 55 +++++++++
.../server/upsert/UpsertComponentContainer.java | 20 +++-
.../upsert/UpsertComponentContainerProvider.java | 58 +++++++++
.../apache/pinot/server/api/BaseResourceTest.java | 3 +-
.../realtime/provisioning/MemoryEstimator.java | 9 +-
50 files changed, 961 insertions(+), 635 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index a39bdfc..cb278b4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.slf4j.Logger;
@@ -108,14 +109,14 @@ public abstract class BaseTableDataManager implements TableDataManager {
* @param immutableSegment Immutable segment to add
*/
@Override
- public void addSegment(ImmutableSegment immutableSegment) {
+ public void addSegment(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback) {
String segmentName = immutableSegment.getSegmentName();
_logger.info("Adding immutable segment: {} to table: {}", segmentName, _tableNameWithType);
_serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT,
immutableSegment.getSegmentMetadata().getTotalRawDocs());
_serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
- ImmutableSegmentDataManager newSegmentManager = getImmutableSegmentDataManager(immutableSegment);
+ ImmutableSegmentDataManager newSegmentManager = getImmutableSegmentDataManager(immutableSegment, dataManagerCallback);
SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager);
if (oldSegmentManager == null) {
_logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType);
@@ -212,7 +213,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
return _tableNameWithType;
}
- protected ImmutableSegmentDataManager getImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
- return new ImmutableSegmentDataManager(immutableSegment);
+ protected ImmutableSegmentDataManager getImmutableSegmentDataManager(ImmutableSegment immutableSegment,
+ DataManagerCallback callback) {
+ return new ImmutableSegmentDataManager(immutableSegment, callback);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 175f5a7..eba0689 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -132,8 +132,8 @@ public interface InstanceDataManager {
*/
ZkHelixPropertyStore<ZNRecord> getPropertyStore();
- /**
- * Return the mappings from partition -> low water marks of all the tables hosted in this server.
- */
- Map<String, Map<Integer, Long>> getLowWaterMarks();
+// /**
+// * Return the mappings from partition -> low water marks of all the tables hosted in this server.
+// */
+// Map<String, Map<Integer, Long>> getLowWaterMarks();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
index 7b1c79c..46aa745 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
@@ -27,6 +27,8 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallback;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
@@ -58,7 +60,7 @@ public interface TableDataManager {
/**
* Adds a loaded immutable segment into the table.
*/
- void addSegment(ImmutableSegment immutableSegment);
+ void addSegment(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback);
/**
* Adds a segment from local disk into the OFFLINE table.
@@ -116,4 +118,10 @@ public interface TableDataManager {
* Returns the table name managed by this instance.
*/
String getTableName();
+
+ /**
+ * get the callback object for this table, either no-op object for the regular table or upsert callback for
+ * upsert-enabled table
+ */
+ TableDataManagerCallback getTableDataManagerCallback();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
index 0401f61..949affe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
@@ -18,9 +18,14 @@
*/
package org.apache.pinot.core.data.manager.offline;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import java.io.IOException;
+
/**
* Segment data manager for immutable segment.
@@ -28,9 +33,20 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
public class ImmutableSegmentDataManager extends SegmentDataManager {
protected final ImmutableSegment _immutableSegment;
+ protected final DataManagerCallback _dataManagerCallback;
public ImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
+ this(immutableSegment, DefaultDataManagerCallbackImpl.INSTANCE);
+ }
+
+ public ImmutableSegmentDataManager(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback) {
_immutableSegment = immutableSegment;
+ _dataManagerCallback = dataManagerCallback;
+ try {
+ _dataManagerCallback.initVirtualColumns();
+ } catch (IOException ex) {
+ ExceptionUtils.rethrow(ex);
+ }
}
@Override
@@ -45,6 +61,7 @@ public class ImmutableSegmentDataManager extends SegmentDataManager {
@Override
public void destroy() {
+ _dataManagerCallback.destroy();
_immutableSegment.destroy();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index c2c9351..61230a4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -20,6 +20,9 @@ package org.apache.pinot.core.data.manager.offline;
import java.io.File;
import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallback;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
@@ -33,6 +36,12 @@ import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
@ThreadSafe
public class OfflineTableDataManager extends BaseTableDataManager {
+ private TableDataManagerCallback _tableDataManagerCallback;
+
+ public OfflineTableDataManager(TableDataManagerCallback tableDataManagerCallback) {
+ _tableDataManagerCallback = tableDataManagerCallback;
+ }
+
@Override
protected void doInit() {
}
@@ -49,6 +58,13 @@ public class OfflineTableDataManager extends BaseTableDataManager {
public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
throws Exception {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+ DataManagerCallback callback = _tableDataManagerCallback.getDefaultDataManagerCallback();
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
+ callback, schema), callback);
+ }
+
+ @Override
+ public TableDataManagerCallback getTableDataManagerCallback() {
+ return _tableDataManagerCallback;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 3ec1bbd..f5b009f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.data.manager.offline;
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -25,8 +27,8 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
-import org.apache.pinot.core.data.manager.realtime.AppendRealtimeTableDataManager;
-import org.apache.pinot.core.data.manager.realtime.UpsertRealtimeTableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallbackProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +43,18 @@ public class TableDataManagerProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(TableDataManagerProvider.class);
private static Semaphore _segmentBuildSemaphore;
+ private static TableDataManagerCallbackProvider _tableDataManagerCallbackProvider;
private TableDataManagerProvider() {
}
public static void init(InstanceDataManagerConfig instanceDataManagerConfig) {
+ init(instanceDataManagerConfig, new TableDataManagerCallbackProvider(new PropertiesConfiguration()));
+ }
+
+ public static void init(InstanceDataManagerConfig instanceDataManagerConfig,
+ TableDataManagerCallbackProvider callbackProvider) {
+ _tableDataManagerCallbackProvider = callbackProvider;
int maxParallelBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds();
if (maxParallelBuilds > 0) {
_segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
@@ -55,17 +64,16 @@ public class TableDataManagerProvider {
public static TableDataManager getTableDataManager(@Nonnull TableDataManagerConfig tableDataManagerConfig,
@Nonnull String instanceId, @Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore,
@Nonnull ServerMetrics serverMetrics) {
- TableDataManager tableDataManager;
+ Preconditions.checkNotNull(_tableDataManagerCallbackProvider, "callback provider is not init");
+ final TableDataManager tableDataManager;
switch (CommonConstants.Helix.TableType.valueOf(tableDataManagerConfig.getTableDataManagerType())) {
case OFFLINE:
- tableDataManager = new OfflineTableDataManager();
+ tableDataManager = new OfflineTableDataManager(
+ _tableDataManagerCallbackProvider.getDefaultTableDataManagerCallback());
break;
case REALTIME:
- if (tableDataManagerConfig.getUpdateSemantic() == CommonConstants.UpdateSemantic.UPSERT) {
- tableDataManager = new UpsertRealtimeTableDataManager(_segmentBuildSemaphore);
- } else {
- tableDataManager = new AppendRealtimeTableDataManager(_segmentBuildSemaphore);
- }
+ tableDataManager = new RealtimeTableDataManager(_segmentBuildSemaphore,
+ _tableDataManagerCallbackProvider.getTableDataManagerCallback(tableDataManagerConfig));
break;
default:
throw new IllegalStateException();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/UpsertImmutableSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/UpsertImmutableSegmentDataManager.java
deleted file mode 100644
index 790e912..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/UpsertImmutableSegmentDataManager.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.offline;
-
-import org.apache.pinot.common.config.TableNameBuilder;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
-import org.apache.pinot.core.segment.updater.SegmentUpdater;
-import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
-
-import java.io.IOException;
-import java.util.List;
-
-public class UpsertImmutableSegmentDataManager extends ImmutableSegmentDataManager implements UpsertSegmentDataManager {
-
- private String _tableNameWithType;
-
- public UpsertImmutableSegmentDataManager(ImmutableSegment immutableSegment) throws IOException {
- super(immutableSegment);
- _tableNameWithType = TableNameBuilder.ensureTableNameWithType(immutableSegment.getSegmentMetadata().getTableName(),
- CommonConstants.Helix.TableType.REALTIME);
- initVirtualColumns();
- }
-
- @Override
- public void updateVirtualColumns(List<UpdateLogEntry> messages) {
- ((UpsertSegment) _immutableSegment).updateVirtualColumn(messages);
- }
-
- @Override
- public String getVirtualColumnInfo(long offset) {
- return ((UpsertSegment) _immutableSegment).getVirtualColumnInfo(offset);
- }
-
- @Override
- public void destroy() {
- SegmentUpdater.getInstance().removeSegmentDataManager(_tableNameWithType, getSegmentName(), this);
- super.destroy();
- }
-
- private void initVirtualColumns() throws IOException {
- // 1. add listener for update events
- // 2. load all existing messages
- // ensure the above orders so we can ensure all events are received by this data manager
- SegmentUpdater.getInstance().addSegmentDataManager(_tableNameWithType, new LLCSegmentName(getSegmentName()), this);
- ((UpsertSegment) _immutableSegment).initVirtualColumn();
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendLLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendLLRealtimeSegmentDataManager.java
deleted file mode 100644
index f725cc8..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendLLRealtimeSegmentDataManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.indexsegment.mutable.MutableAppendSegmentImpl;
-import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-
-import java.util.concurrent.Semaphore;
-
-public class AppendLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
-
- public AppendLLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
- RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics) {
- super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, indexLoadingConfig, schema,
- llcSegmentName, partitionConsumerSemaphore, serverMetrics);
- }
-
- @Override
- protected MutableSegmentImpl createMutableSegment(RealtimeSegmentConfig config) {
- return new MutableAppendSegmentImpl(config);
- }
-
- @Override
- protected void processTransformedRow(GenericRow row, long offset) {
- // do nothing
- }
-
- @Override
- protected void postIndexProcessing(GenericRow row, long offset) {
- // do nothing
- }
-
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendRealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendRealtimeTableDataManager.java
deleted file mode 100644
index 741fb08..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendRealtimeTableDataManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.spi.data.Schema;
-
-import java.io.File;
-import java.util.concurrent.Semaphore;
-
-public class AppendRealtimeTableDataManager extends RealtimeTableDataManager {
-
- public AppendRealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
- super(segmentBuildSemaphore);
- }
-
- @Override
- protected LLRealtimeSegmentDataManager getLLRealtimeSegmentDataManager(
- RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, TableConfig tableConfig,
- RealtimeTableDataManager realtimeTableDataManager, String indexDirPath, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics)
- throws Exception {
- return new AppendLLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, realtimeTableDataManager,
- indexDirPath, indexLoadingConfig, schema, llcSegmentName, partitionConsumerSemaphore, serverMetrics);
- }
-
- @Override
- protected ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema)
- throws Exception {
- return ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema);
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 086812e..d767e98 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.core.indexsegment.mutable.MutableAppendSegmentImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -195,7 +195,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
indexLoadingConfig.isDirectRealtimeOffheapAllocation(), serverMetrics))
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build();
- realtimeSegment = new MutableAppendSegmentImpl(realtimeSegmentConfig);
+ realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig,
+ DefaultDataManagerCallbackImpl.INSTANCE.getIndexSegmentCallback());
notifier = realtimeTableDataManager;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7765e9e..4f3ae50 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -50,6 +50,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.CompletionMode;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -83,7 +84,7 @@ import org.slf4j.LoggerFactory;
/**
* Segment data manager for low level consumer realtime segments, which manages consumption and segment completion.
*/
-public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
+public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
protected enum State {
// The state machine starts off with this state. While in this state we consume stream events
// and index them in memory. We continue to be in this state until the end criteria is satisfied
@@ -272,6 +273,9 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
private final boolean _nullHandlingEnabled;
private final SegmentCommitterFactory _segmentCommitterFactory;
+ // upsert/append related components
+ private final DataManagerCallback _dataManagerCallback;
+
// TODO each time this method is called, we print reason for stop. Good to print only once.
private boolean endCriteriaReached() {
Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
@@ -416,6 +420,7 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
}
+ _dataManagerCallback.postConsumeLoop();
return true;
}
@@ -467,14 +472,14 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
GenericRow transformedRow = _recordTransformer.transform(decodedRow);
if (transformedRow != null) {
- processTransformedRow(transformedRow, _currentOffset);
+ _dataManagerCallback.processTransformedRow(transformedRow, _currentOffset);
realtimeRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
indexedMessageCount++;
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
- postIndexProcessing(transformedRow, _currentOffset);
+ _dataManagerCallback.postIndexProcessing(transformedRow, _currentOffset);
} else {
realtimeRowsDroppedMeter = _serverMetrics
.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
@@ -505,11 +510,6 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
}
}
- protected abstract void processTransformedRow(GenericRow row, long offset);
-
-
- protected abstract void postIndexProcessing(GenericRow row, long offset);
-
public class PartitionConsumer implements Runnable {
public void run() {
long initialConsumptionEnd = 0L;
@@ -1017,6 +1017,7 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
}
public void destroy() {
+ _dataManagerCallback.destroy();
try {
stop();
} catch (InterruptedException e) {
@@ -1054,7 +1055,8 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
// If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here.
public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics) {
+ Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics,
+ DataManagerCallback dataManagerCallback) {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
_tableConfig = tableConfig;
@@ -1064,6 +1066,7 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
_indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_serverMetrics = serverMetrics;
+ _dataManagerCallback = dataManagerCallback;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
_leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
@@ -1185,7 +1188,8 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
}
}
- _realtimeSegment = createMutableSegment(realtimeSegmentConfigBuilder.build());
+ _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(),
+ _dataManagerCallback.getIndexSegmentCallback());
_startOffset = _segmentZKMetadata.getStartOffset();
_currentOffset = _startOffset;
_resourceTmpDir = new File(resourceDataDir, "_tmp");
@@ -1221,8 +1225,6 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
start();
}
- protected abstract MutableSegmentImpl createMutableSegment(RealtimeSegmentConfig config);
-
/**
* Creates a new stream consumer
* @param reason
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2be3ca1..bc5316d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -48,6 +48,8 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallback;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
@@ -59,12 +61,13 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@ThreadSafe
-public abstract class RealtimeTableDataManager extends BaseTableDataManager {
+public class RealtimeTableDataManager extends BaseTableDataManager {
private final ExecutorService _segmentAsyncExecutorService =
Executors.newSingleThreadExecutor(new NamedThreadFactory("SegmentAsyncExecutorService"));
private SegmentBuildTimeLeaseExtender _leaseExtender;
private RealtimeSegmentStatsHistory _statsHistory;
private final Semaphore _segmentBuildSemaphore;
+ private final TableDataManagerCallback _tableDataManagerCallback;
// Maintains a map of partitionIds to semaphores.
// The semaphore ensures that exactly one PartitionConsumer instance consumes from any stream partition.
// In some streams, it's possible that having multiple consumers (with the same consumer name on the same host) consuming from the same stream partition can lead to bugs.
@@ -90,12 +93,14 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
// likely that we get fresh data each time instead of multiple copies of roughly same data.
private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
- public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
+ public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, TableDataManagerCallback tableDataManagerCallback) {
_segmentBuildSemaphore = segmentBuildSemaphore;
+ _tableDataManagerCallback = tableDataManagerCallback;
}
@Override
protected void doInit() {
+ _tableDataManagerCallback.init();
_leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
@@ -227,7 +232,9 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
if (indexDir.exists() && (realtimeSegmentZKMetadata.getStatus() == Status.DONE)) {
// Segment already exists on disk, and metadata has been committed. Treat it like an offline segment
- addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema));
+ DataManagerCallback callback = _tableDataManagerCallback
+ .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+ addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, callback), callback);
} else {
// Either we don't have the segment on disk or we have not committed in ZK. We should be starting the consumer
// for realtime segment here. If we wrote it on disk but could not get to commit to zk yet, we should replace the
@@ -257,15 +264,21 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int streamPartitionId = llcSegmentName.getPartitionId();
_partitionIdToSemaphoreMap.putIfAbsent(streamPartitionId, new Semaphore(1));
- manager = getLLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this,
- _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName,
- _partitionIdToSemaphoreMap.get(streamPartitionId), _serverMetrics);
+ manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig,
+ this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName,
+ _partitionIdToSemaphoreMap.get(streamPartitionId), _serverMetrics,
+ _tableDataManagerCallback.getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, true));
}
_logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
_segmentDataManagerMap.put(segmentName, manager);
}
}
+ @Override
+ public TableDataManagerCallback getTableDataManagerCallback() {
+ return _tableDataManagerCallback;
+ }
+
public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
IndexLoadingConfig indexLoadingConfig, Schema schema) {
final String uri = llcSegmentMetadata.getDownloadUrl();
@@ -297,7 +310,8 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
ZKMetadataProvider.setRealtimeSegmentZKMetadata(_propertyStore, _tableNameWithType, segmentZKMetadata);
File indexDir = new File(_indexDir, segmentZKMetadata.getSegmentName());
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig),
+ _tableDataManagerCallback.getDefaultDataManagerCallback());
}
/**
@@ -306,7 +320,9 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
public void replaceLLSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, Schema schema) {
try {
File indexDir = new File(_indexDir, segmentName);
- addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema));
+ DataManagerCallback dataManagerCallback = _tableDataManagerCallback
+ .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+ addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, dataManagerCallback), dataManagerCallback);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -316,17 +332,11 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
return _instanceId;
}
- /**
- * allow {@link UpsertRealtimeTableDataManager} to override this method
- */
- protected abstract LLRealtimeSegmentDataManager getLLRealtimeSegmentDataManager(
- RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, TableConfig tableConfig,
- RealtimeTableDataManager realtimeTableDataManager, String indexDirPath, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics)
- throws Exception;
-
- protected abstract ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema)
- throws Exception;
+ protected ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema,
+ DataManagerCallback dataManagerCallback)
+ throws Exception {
+ return ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, dataManagerCallback, schema);
+ }
/**
* Validate a schema against the table config for real-time record consumption.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertRealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertRealtimeTableDataManager.java
deleted file mode 100644
index 12569db..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertRealtimeTableDataManager.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.offline.UpsertImmutableSegmentDataManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
-import org.apache.pinot.spi.data.Schema;
-
-import javax.annotation.Nonnull;
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Semaphore;
-
-public class UpsertRealtimeTableDataManager extends RealtimeTableDataManager {
- private UpdateLogStorageProvider _updateLogStorageProvider;
-
- public UpsertRealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
- super(segmentBuildSemaphore);
- _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
- }
-
- @Override
- public void addSegment(@Nonnull String segmentName, @Nonnull TableConfig tableConfig,
- @Nonnull IndexLoadingConfig indexLoadingConfig) throws Exception {
- _updateLogStorageProvider.addSegment(_tableNameWithType, segmentName);
- super.addSegment(segmentName, tableConfig, indexLoadingConfig);
- }
-
- @Override
- protected LLRealtimeSegmentDataManager getLLRealtimeSegmentDataManager(
- RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, TableConfig tableConfig,
- RealtimeTableDataManager realtimeTableDataManager, String indexDirPath, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics)
- throws Exception {
- return new UpsertLLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, realtimeTableDataManager,
- indexDirPath, indexLoadingConfig, schema, llcSegmentName, partitionConsumerSemaphore, serverMetrics);
- }
-
- @Override
- protected ImmutableSegmentDataManager getImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
- try {
- return new UpsertImmutableSegmentDataManager(immutableSegment);
- } catch (IOException e) {
- throw new RuntimeException("failed to init the upsert immutable segment", e);
- }
- }
-
- @Override
- protected ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema) {
- try {
- return ImmutableSegmentLoader.loadUpsertSegment(indexDir, indexLoadingConfig, schema);
- } catch (Exception e) {
- throw new RuntimeException("failed to load immutable segment", e);
- }
- }
-
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
similarity index 60%
copy from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
index 1c9effc..466553e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
@@ -16,22 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.core.data.manager.upsert;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class MutableAppendSegmentImpl extends MutableSegmentImpl {
- private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
+import java.io.IOException;
+import java.util.List;
- public MutableAppendSegmentImpl(RealtimeSegmentConfig config) {
- super(config);
- }
+public interface DataManagerCallback {
- @Override
- protected void postProcessRecords(GenericRow row, int docId) {
- // nothing
- }
+ IndexSegmentCallback getIndexSegmentCallback();
+
+ void processTransformedRow(GenericRow row, long offset);
+
+ void postIndexProcessing(GenericRow row, long offset);
+
+ void postConsumeLoop();
+
+ void initVirtualColumns() throws IOException;
+
+ void updateVirtualColumns(List<UpdateLogEntry> messages);
+
+ void destroy();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultDataManagerCallbackImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultDataManagerCallbackImpl.java
new file mode 100644
index 0000000..347c541
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultDataManagerCallbackImpl.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DefaultDataManagerCallbackImpl implements DataManagerCallback {
+
+ // expect to use the cached instance for append mode to reduce memory usage
+ public static final DefaultDataManagerCallbackImpl INSTANCE = new DefaultDataManagerCallbackImpl();
+
+ private final IndexSegmentCallback _indexSegmentCallback;
+
+ private DefaultDataManagerCallbackImpl() {
+ _indexSegmentCallback = DefaultIndexSegmentCallback.INSTANCE;
+ }
+
+ public IndexSegmentCallback getIndexSegmentCallback() {
+ return _indexSegmentCallback;
+ }
+
+ @Override
+ public void processTransformedRow(GenericRow row, long offset) {
+ // do nothing
+ }
+
+ @Override
+ public void postIndexProcessing(GenericRow row, long offset) {
+ // do nothing
+ }
+
+ @Override
+ public void postConsumeLoop() {
+ // do nothing
+ }
+
+ @Override
+ public void initVirtualColumns() throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public void updateVirtualColumns(List<UpdateLogEntry> messages) {
+ // do nothing
+ }
+
+ @Override
+ public void destroy() {
+ // do nothing
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
new file mode 100644
index 0000000..0299cb7
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+import joptsimple.internal.Strings;
+import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+import java.util.Map;
+
+public class DefaultIndexSegmentCallback implements IndexSegmentCallback {
+
+ public static final DefaultIndexSegmentCallback INSTANCE = new DefaultIndexSegmentCallback();
+
+ private DefaultIndexSegmentCallback(){}
+
+ @Override
+ public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
+ // do nothing
+ }
+
+ @Override
+ public void initOffsetColumn(ColumnIndexContainer offsetColumnContainer) {
+ // do noting
+ }
+
+ @Override
+ public void postProcessRecords(GenericRow row, int docId) {
+ // do nothing
+ }
+
+ @Override
+ public void initVirtualColumn() {
+ // do nothing
+ }
+
+ @Override
+ public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+ // do nothing
+ }
+
+ @Override
+ public String getVirtualColumnInfo(long offset) {
+ return Strings.EMPTY;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
similarity index 52%
copy from pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
index 0401f61..2e56455 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
@@ -16,40 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.data.manager.offline;
+package org.apache.pinot.core.data.manager.upsert;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.data.Schema;
+public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCallback {
-/**
- * Segment data manager for immutable segment.
- */
-public class ImmutableSegmentDataManager extends SegmentDataManager {
-
- protected final ImmutableSegment _immutableSegment;
-
- public ImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
- _immutableSegment = immutableSegment;
- }
+ private static final DefaultDataManagerCallbackImpl DEFAULT_DM_CALLBACK = DefaultDataManagerCallbackImpl.INSTANCE;
@Override
- public String getSegmentName() {
- return _immutableSegment.getSegmentName();
+ public void init() {
}
@Override
- public ImmutableSegment getSegment() {
- return _immutableSegment;
+ public void addSegment(String tableName, String segmentName, TableConfig tableConfig) {
}
@Override
- public void destroy() {
- _immutableSegment.destroy();
+ public DataManagerCallback getDataManagerCallback(String tableName, String segmentName,
+ Schema schema, ServerMetrics serverMetrics, boolean isMutable) {
+ return DEFAULT_DM_CALLBACK;
}
@Override
- public String toString() {
- return "ImmutableSegmentDataManager(" + _immutableSegment.getSegmentName() + ")";
+ public DataManagerCallback getDefaultDataManagerCallback() {
+ return DEFAULT_DM_CALLBACK;
}
+
+
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
similarity index 52%
copy from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
index 1c9effc..dd198f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
@@ -16,22 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.core.data.manager.upsert;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class MutableAppendSegmentImpl extends MutableSegmentImpl {
- private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
+import java.io.IOException;
+import java.util.Map;
- public MutableAppendSegmentImpl(RealtimeSegmentConfig config) {
- super(config);
- }
+public interface IndexSegmentCallback {
- @Override
- protected void postProcessRecords(GenericRow row, int docId) {
- // nothing
- }
+ void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader);
+
+ void initOffsetColumn(ColumnIndexContainer offsetColumnContainer);
+
+ void postProcessRecords(GenericRow row, int docId);
+
+ void initVirtualColumn() throws IOException;
+
+ void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries);
+
+ String getVirtualColumnInfo(long offset);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
similarity index 59%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
index 97d48f3..ec6a15f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
@@ -16,10 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.updater;
+package org.apache.pinot.core.data.manager.upsert;
-public class SegmentUpdaterConfig {
- public static final String SEGMENT_UDPATE_SLEEP_MS = "sleep.ms";
- public static final String INPUT_TOPIC_PREFIX = "input.topic.prefix";
- public static final int SEGMENT_UDPATE_SLEEP_MS_DEFAULT = 100;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.data.Schema;
+
+public interface TableDataManagerCallback {
+
+ void init();
+
+ void addSegment(String tableName, String segmentName, TableConfig tableConfig);
+
+ DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
+ ServerMetrics serverMetrics, boolean isMutable);
+
+ DataManagerCallback getDefaultDataManagerCallback();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
new file mode 100644
index 0000000..2e868a2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TableDataManagerCallbackProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TableDataManagerCallbackProvider.class);
+
+ private Class<TableDataManagerCallback> defaultTableDataManagerCallBackClass;
+ private Class<TableDataManagerCallback> upsertTableDataManagerCallBackClass;
+
+ public static final String UPSERT_CALLBACK_CLASS_CONFIG_KEY = "upsert.tableDataManager.callback";
+ public static final String DEFAULT_CALLBACK_CLASS_CONFIG_KEY = "append.tableDataManager.callback";
+ public static final String CALLBACK_CLASS_CONFIG_DEFAULT = DefaultTableDataManagerCallbackImpl.class.getName();
+
+ public TableDataManagerCallbackProvider(Configuration configuration) {
+ String appendClassName = configuration.getString(DEFAULT_CALLBACK_CLASS_CONFIG_KEY, CALLBACK_CLASS_CONFIG_DEFAULT);
+ String upsertClassName = configuration.getString(UPSERT_CALLBACK_CLASS_CONFIG_KEY);
+ try {
+ defaultTableDataManagerCallBackClass = (Class<TableDataManagerCallback>) Class.forName(appendClassName);
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("failed to load table data manager class {}", appendClassName, e);
+ ExceptionUtils.rethrow(e);
+ }
+ Preconditions.checkState(defaultTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
+ "configured class not assignable from Callback class", defaultTableDataManagerCallBackClass);
+ if (StringUtils.isNotEmpty(upsertClassName)) {
+ try {
+ upsertTableDataManagerCallBackClass = (Class<TableDataManagerCallback>) Class.forName(upsertClassName);
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("failed to load table data manager class {}", upsertClassName);
+ ExceptionUtils.rethrow(e);
+ }
+ Preconditions.checkState(upsertTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
+ "configured class not assignable from Callback class");
+ }
+ }
+
+ public TableDataManagerCallback getTableDataManagerCallback(TableDataManagerConfig tableDataManagerConfig) {
+ if (tableDataManagerConfig.getUpdateSemantic() == CommonConstants.UpdateSemantic.UPSERT) {
+ return getUpsertTableDataManagerCallback();
+ } else {
+ return getDefaultTableDataManagerCallback();
+ }
+ }
+
+ public TableDataManagerCallback getUpsertTableDataManagerCallback() {
+ try {
+ return upsertTableDataManagerCallBackClass.newInstance();
+ } catch (Exception ex) {
+ LOGGER.error("failed to initialize new table data manager callback {}", upsertTableDataManagerCallBackClass.getName());
+ ExceptionUtils.rethrow(ex);
+ }
+ return null;
+ }
+
+ public TableDataManagerCallback getDefaultTableDataManagerCallback() {
+ try {
+ return defaultTableDataManagerCallBackClass.newInstance();
+ } catch (Exception ex) {
+ LOGGER.error("failed to initialize new table data manager callback {}", upsertTableDataManagerCallBackClass.getName());
+ ExceptionUtils.rethrow(ex);
+ }
+ return null;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
index 21f056c..5e070ba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -19,10 +19,11 @@
package org.apache.pinot.core.indexsegment.immutable;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import javax.annotation.Nullable;
+
+import org.apache.pinot.core.data.manager.upsert.IndexSegmentCallback;
+import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -43,18 +44,33 @@ import org.slf4j.LoggerFactory;
public class ImmutableSegmentImpl implements ImmutableSegment {
private static final Logger LOGGER = LoggerFactory.getLogger(ImmutableSegmentImpl.class);
- protected final SegmentDirectory _segmentDirectory;
- protected final SegmentMetadataImpl _segmentMetadata;
- protected final Map<String, ColumnIndexContainer> _indexContainerMap;
- protected final StarTreeIndexContainer _starTreeIndexContainer;
+ private final SegmentDirectory _segmentDirectory;
+ private final SegmentMetadataImpl _segmentMetadata;
+ private final Map<String, ColumnIndexContainer> _indexContainerMap;
+ private final StarTreeIndexContainer _starTreeIndexContainer;
+ private final IndexSegmentCallback _segmentCallback;
+
public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, SegmentMetadataImpl segmentMetadata,
Map<String, ColumnIndexContainer> columnIndexContainerMap,
- @Nullable StarTreeIndexContainer starTreeIndexContainer) {
+ @Nullable StarTreeIndexContainer starTreeIndexContainer, IndexSegmentCallback segmentCallback) {
_segmentDirectory = segmentDirectory;
_segmentMetadata = segmentMetadata;
_indexContainerMap = columnIndexContainerMap;
_starTreeIndexContainer = starTreeIndexContainer;
+ _segmentCallback = segmentCallback;
+
+ // create virtualColumnReaderWriter
+ Map<String, DataFileReader> virtualColumnsReaderWriter = new HashMap<>();
+ for (Map.Entry<String, ColumnIndexContainer> entry: columnIndexContainerMap.entrySet()) {
+ String columnName = entry.getKey();
+ ColumnIndexContainer container = entry.getValue();
+ if (segmentMetadata.getSchema().isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
+ virtualColumnsReaderWriter.put(columnName, container.getForwardIndex());
+ }
+ }
+ _segmentCallback.init(segmentMetadata, virtualColumnsReaderWriter);
+ _segmentCallback.initOffsetColumn(columnIndexContainerMap.get(segmentMetadata.getSchema().getOffsetKey()));
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index 49d7729..198a164 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.segment.index.ColumnMetadata;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -44,8 +46,6 @@ import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-
public class ImmutableSegmentLoader {
private ImmutableSegmentLoader() {
@@ -60,7 +60,7 @@ public class ImmutableSegmentLoader {
throws Exception {
IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
defaultIndexLoadingConfig.setReadMode(readMode);
- return load(indexDir, defaultIndexLoadingConfig, null);
+ return load(indexDir, defaultIndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, null);
}
/**
@@ -68,25 +68,18 @@ public class ImmutableSegmentLoader {
*/
public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig)
throws Exception {
- return load(indexDir, indexLoadingConfig, null);
+ return load(indexDir, indexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, null);
}
- public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig, @Nullable Schema schema)
+ public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig,
+ DataManagerCallback dataManagerCallback, @Nullable Schema schema)
throws Exception {
- return loadHelper(indexDir, indexLoadingConfig, schema);
+ return loadHelper(indexDir, indexLoadingConfig, dataManagerCallback, schema);
}
- /**
- * to load upsert related segment
- */
- public static ImmutableUpsertSegmentImpl loadUpsertSegment(File indexDir, IndexLoadingConfig indexLoadingConfig,
- Schema schema) throws Exception {
- ImmutableSegmentImpl segment = loadHelper(indexDir, indexLoadingConfig, schema);
- return ImmutableUpsertSegmentImpl.copyOf(segment);
- }
private static ImmutableSegmentImpl loadHelper(File indexDir, IndexLoadingConfig indexLoadingConfig,
- @Nullable Schema schema) throws Exception {
+ DataManagerCallback dataManagerCallback, @Nullable Schema schema) throws Exception {
Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: {} does not exist or is not a directory",
indexDir);
@@ -155,7 +148,8 @@ public class ImmutableSegmentLoader {
indexContainerMap, readMode);
}
- return new ImmutableSegmentImpl(segmentDirectory, segmentMetadata, indexContainerMap, starTreeIndexContainer);
+ return new ImmutableSegmentImpl(segmentDirectory, segmentMetadata, indexContainerMap, starTreeIndexContainer,
+ dataManagerCallback.getIndexSegmentCallback());
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index b80ac64..12fcd94 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.core.data.manager.upsert.IndexSegmentCallback;
import org.apache.pinot.core.indexsegment.IndexSegmentUtils;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
@@ -76,7 +77,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class MutableSegmentImpl implements MutableSegment {
+public class MutableSegmentImpl implements MutableSegment {
// For multi-valued column, forward-index.
// Maximum number of multi-values per row. We assert on this.
private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
@@ -124,9 +125,9 @@ public abstract class MutableSegmentImpl implements MutableSegment {
private final Collection<MetricFieldSpec> _physicalMetricFieldSpecs;
// Cache some virtual columns
- protected final Map<String, VirtualColumnProvider> _virtualColumnProviderMap = new HashMap<>();
- protected final Map<String, Dictionary> _virtualColumnDictionary = new HashMap<>();
- protected final Map<String, DataFileReader> _virtualColumnIndexReader = new HashMap<>();
+ private final Map<String, VirtualColumnProvider> _virtualColumnProviderMap = new HashMap<>();
+ private final Map<String, Dictionary> _virtualColumnDictionary = new HashMap<>();
+ private final Map<String, DataFileReader> _virtualColumnIndexReader = new HashMap<>();
// default message metadata
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
@@ -134,7 +135,10 @@ public abstract class MutableSegmentImpl implements MutableSegment {
private RealtimeLuceneReaders _realtimeLuceneReaders;
- public MutableSegmentImpl(RealtimeSegmentConfig config) {
+ // upsert/append related components
+ private final IndexSegmentCallback _indexSegmentCallback;
+
+ public MutableSegmentImpl(RealtimeSegmentConfig config, IndexSegmentCallback indexSegmentCallback) {
_tableName = config.getTableName();
_segmentName = config.getSegmentName();
_schema = config.getSchema();
@@ -167,6 +171,7 @@ public abstract class MutableSegmentImpl implements MutableSegment {
_statsHistory = config.getStatsHistory();
_segmentPartitionConfig = config.getSegmentPartitionConfig();
_nullHandlingEnabled = config.isNullHandlingEnabled();
+ _indexSegmentCallback = indexSegmentCallback;
Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
@@ -302,7 +307,7 @@ public abstract class MutableSegmentImpl implements MutableSegment {
}
}
- for (FieldSpec fieldSpec : _physicalFieldSpecs) {
+ for (FieldSpec fieldSpec : virtualFieldSpecs) {
String column = fieldSpec.getName();
VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _capacity, true);
final VirtualColumnProvider provider =
@@ -317,6 +322,8 @@ public abstract class MutableSegmentImpl implements MutableSegment {
_virtualColumnDictionary.put(column, dictionary);
}
+ _indexSegmentCallback.init(_segmentMetadata, _virtualColumnIndexReader);
+
if (_realtimeLuceneReaders != null) {
// add the realtime lucene index readers to the global queue for refresh task to pick up
RealtimeLuceneIndexRefreshState realtimeLuceneIndexRefreshState = RealtimeLuceneIndexRefreshState.getInstance();
@@ -377,7 +384,7 @@ public abstract class MutableSegmentImpl implements MutableSegment {
// Update number of document indexed at last to make the latest record queryable
canTakeMore = _numDocsIndexed++ < _capacity;
- postProcessRecords(row, docId);
+ _indexSegmentCallback.postProcessRecords(row, docId);
} else {
Preconditions
.checkState(_aggregateMetrics, "Invalid document-id during indexing: " + docId + " expected: " + numDocs);
@@ -522,8 +529,6 @@ public abstract class MutableSegmentImpl implements MutableSegment {
}
}
- protected abstract void postProcessRecords(GenericRow row, int docId);
-
private boolean aggregateMetrics(GenericRow row, int docId) {
for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
String column = metricFieldSpec.getName();
@@ -589,7 +594,6 @@ public abstract class MutableSegmentImpl implements MutableSegment {
public ColumnDataSource getDataSource(String columnName) {
FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
if (fieldSpec.isVirtualColumn()) {
- // FIXME
VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _numDocsIndexed, true);
VirtualColumnProvider virtualColumnProvider = _virtualColumnProviderMap.get(columnName);
return new ColumnDataSource(virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
similarity index 70%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
index 97d48f3..01579e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
@@ -18,8 +18,16 @@
*/
package org.apache.pinot.core.segment.updater;
-public class SegmentUpdaterConfig {
- public static final String SEGMENT_UDPATE_SLEEP_MS = "sleep.ms";
- public static final String INPUT_TOPIC_PREFIX = "input.topic.prefix";
- public static final int SEGMENT_UDPATE_SLEEP_MS_DEFAULT = 100;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.grigio.common.metrics.GrigioMeter;
+import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+
+import java.util.Map;
+
+public interface WatermarkManager {
+
+ void init(Configuration config, GrigioMetrics metrics);
+
+ Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 9b28f5a..9626be2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -36,6 +36,9 @@ import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
+import org.apache.pinot.core.data.manager.upsert.DefaultTableDataManagerCallbackImpl;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
@@ -100,7 +103,7 @@ public class BaseTableDataManagerTest {
private TableDataManager makeTestableManager()
throws Exception {
- TableDataManager tableDataManager = new OfflineTableDataManager();
+ TableDataManager tableDataManager = new OfflineTableDataManager(new DefaultTableDataManagerCallbackImpl());
TableDataManagerConfig config;
{
config = mock(TableDataManagerConfig.class);
@@ -139,7 +142,7 @@ public class BaseTableDataManagerTest {
// Add the segment, get it for use, remove the segment, and then return it.
// Make sure that the segment is not destroyed before return.
ImmutableSegment immutableSegment = makeImmutableSegment(segmentName, totalDocs);
- tableDataManager.addSegment(immutableSegment);
+ tableDataManager.addSegment(immutableSegment, DefaultDataManagerCallbackImpl.INSTANCE);
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
Assert.assertEquals(segmentDataManager.getReferenceCount(), 2);
tableDataManager.removeSegment(segmentName);
@@ -161,7 +164,7 @@ public class BaseTableDataManagerTest {
// Add a new segment and remove it in order this time.
final String anotherSeg = "AnotherSegment";
ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
- tableDataManager.addSegment(ix1);
+ tableDataManager.addSegment(ix1, DefaultDataManagerCallbackImpl.INSTANCE);
SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg);
Assert.assertNotNull(sdm1);
Assert.assertEquals(sdm1.getReferenceCount(), 2);
@@ -178,7 +181,7 @@ public class BaseTableDataManagerTest {
Assert.assertEquals(sdm1.getReferenceCount(), 1);
// Now replace the segment with another one.
ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1);
- tableDataManager.addSegment(ix2);
+ tableDataManager.addSegment(ix2, DefaultDataManagerCallbackImpl.INSTANCE);
// Now the previous one should have been destroyed, and
Assert.assertEquals(sdm1.getReferenceCount(), 0);
verify(ix1, times(1)).destroy();
@@ -222,7 +225,8 @@ public class BaseTableDataManagerTest {
for (int i = _lo; i <= _hi; i++) {
final String segName = SEGMENT_PREFIX + i;
- tableDataManager.addSegment(makeImmutableSegment(segName, random.nextInt()));
+ tableDataManager.addSegment(makeImmutableSegment(segName, random.nextInt()),
+ DefaultDataManagerCallbackImpl.INSTANCE);
_allSegManagers.add(_internalSegMap.get(segName));
}
@@ -409,7 +413,8 @@ public class BaseTableDataManagerTest {
private void addSegment() {
final int segmentToAdd = _hi + 1;
final String segName = SEGMENT_PREFIX + segmentToAdd;
- _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()));
+ _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()),
+ DefaultDataManagerCallbackImpl.INSTANCE);
_allSegManagers.add(_internalSegMap.get(segName));
_hi = segmentToAdd;
}
@@ -418,7 +423,8 @@ public class BaseTableDataManagerTest {
private void replaceSegment() {
int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo;
final String segName = SEGMENT_PREFIX + segToReplace;
- _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()));
+ _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()),
+ DefaultDataManagerCallbackImpl.INSTANCE);
_allSegManagers.add(_internalSegMap.get(segName));
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index c4b557b..2f8bc3f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
@@ -696,7 +697,7 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertEquals(secondSegmentDataManager.get().getPartitionConsumerSemaphore().availablePermits(), 1);
}
- public static class FakeLLRealtimeSegmentDataManager extends AppendLLRealtimeSegmentDataManager {
+ public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
public Field _state;
public Field _shouldStop;
@@ -729,7 +730,7 @@ public class LLRealtimeSegmentDataManagerTest {
throws Exception {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName,
- semaphoreMap.get(llcSegmentName.getPartitionId()), serverMetrics);
+ semaphoreMap.get(llcSegmentName.getPartitionId()), serverMetrics, DefaultDataManagerCallbackImpl.INSTANCE);
_state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
_state.setAccessible(true);
_shouldStop = LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 26e7eff..4ccf197 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.indexsegment.mutable;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
@@ -59,6 +60,6 @@ public class MutableSegmentImplTestUtils {
.setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
.setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).build();
- return new MutableAppendSegmentImpl(realtimeSegmentConfig);
+ return new MutableSegmentImpl(realtimeSegmentConfig, DefaultIndexSegmentCallback.INSTANCE);
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
index a4ee984..4d1d469 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -141,7 +142,7 @@ public class SegmentGenerationWithNullValueVectorTest {
.getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
mock(ServerMetrics.class));
tableDataManager.start();
- tableDataManager.addSegment(_segment);
+ tableDataManager.addSegment(_segment, DefaultDataManagerCallbackImpl.INSTANCE);
_instanceDataManager = mock(InstanceDataManager.class);
when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
index 59342a3..d71896c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFileFilter;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
@@ -225,12 +226,14 @@ public class LoaderTest {
schema.addField(new DimensionFieldSpec("SVString", FieldSpec.DataType.STRING, true, ""));
schema.addField(new DimensionFieldSpec("MVString", FieldSpec.DataType.STRING, false, ""));
- IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, _v1IndexLoadingConfig, schema);
+ IndexSegment indexSegment = ImmutableSegmentLoader
+ .load(_indexDir, _v1IndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, schema);
Assert.assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
Assert.assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
indexSegment.destroy();
- indexSegment = ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig, schema);
+ indexSegment = ImmutableSegmentLoader
+ .load(_indexDir, _v3IndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, schema);
Assert.assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
Assert.assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
indexSegment.destroy();
@@ -245,7 +248,8 @@ public class LoaderTest {
FieldSpec byteMetric = new MetricFieldSpec(newColumnName, FieldSpec.DataType.BYTES, defaultValue);
schema.addField(byteMetric);
- IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig, schema);
+ IndexSegment indexSegment = ImmutableSegmentLoader
+ .load(_indexDir, _v3IndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, schema);
Assert
.assertEquals(BytesUtils.toHexString((byte[]) indexSegment.getDataSource(newColumnName).getDictionary().get(0)),
defaultValue);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index c8e688b..b0f5a55 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -29,6 +29,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.response.broker.AggregationResult;
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index b6ce1e2..203d2f6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -103,7 +104,7 @@ public class QueryExecutorTest {
mock(ServerMetrics.class));
tableDataManager.start();
for (ImmutableSegment indexSegment : _indexSegments) {
- tableDataManager.addSegment(indexSegment);
+ tableDataManager.addSegment(indexSegment, DefaultDataManagerCallbackImpl.INSTANCE);
}
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pinot-grigio-provided/pom.xml
similarity index 51%
copy from pinot-grigio/pom.xml
copy to pinot-grigio/pinot-grigio-provided/pom.xml
index e53fa72..5643070 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pinot-grigio-provided/pom.xml
@@ -22,21 +22,38 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>pinot</artifactId>
- <groupId>org.apache.pinot</groupId>
- <version>0.3.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>pinot-grigio</artifactId>
- <packaging>pom</packaging>
- <name>Pinot Grigio</name>
- <url>http://maven.apache.org</url>
- <modules>
- <module>pinot-grigio-common</module>
- <module>pinot-grigio-coordinator</module>
- </modules>
+ <parent>
+ <artifactId>pinot-grigio</artifactId>
+ <groupId>org.apache.pinot</groupId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pinot-grigio-provided</artifactId>
+ <name>Pinot Grigio Provided components</name>
<properties>
- <pinot.root>${basedir}/..</pinot.root>
- </properties>
+ <pinot.root>${basedir}/../..</pinot.root>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+
</project>
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertLLRealtimeSegmentDataManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertDataManagerCallbackImpl.java
similarity index 57%
rename from pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertLLRealtimeSegmentDataManager.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertDataManagerCallbackImpl.java
index f08037a..24731f2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertLLRealtimeSegmentDataManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertDataManagerCallbackImpl.java
@@ -16,20 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.data.manager.realtime;
+package org.apache.pinot.core.data.manager.upsert;
import com.google.common.base.Preconditions;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
-import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
-import org.apache.pinot.core.indexsegment.mutable.MutableUpsertSegmentImpl;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.segment.updater.SegmentUpdater;
import org.apache.pinot.grigio.common.messages.KeyCoordinatorQueueMsg;
import org.apache.pinot.grigio.common.rpcQueue.ProduceTask;
@@ -40,71 +34,74 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.Semaphore;
-/**
- * class design is pretty bad right now, need to rework inheritance to abstract the base class or use composition instead
- */
-public class UpsertLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager implements UpsertSegmentDataManager {
+public class UpsertDataManagerCallbackImpl implements DataManagerCallback {
- private final QueueProducer _keyCoordinatorQueueProducer;
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertDataManagerCallbackImpl.class);
- public UpsertLLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
- RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics) throws Exception {
- super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, indexLoadingConfig, schema,
- llcSegmentName, partitionConsumerSemaphore, serverMetrics);
- Preconditions.checkState(_schema.getPrimaryKeyFieldSpec() != null, "primary key not found");
- Preconditions.checkState(_schema.getOffsetKeyFieldSpec() != null, "offset key not found");
+ private final String _tableNameWithType;
+ private final QueueProducer _keyCoordinatorQueueProducer;
+ private final String _segmentName;
+ private final Schema _schema;
+ private final ServerMetrics _serverMetrics;
+ private IndexSegmentCallback _indexSegmentCallback;
+
+ public UpsertDataManagerCallbackImpl(String tableNameWithType, String segmentName, Schema schema,
+ ServerMetrics serverMetrics, boolean isMutable) {
+ Preconditions.checkState(schema.getPrimaryKeyFieldSpec() != null, "primary key not found");
+ Preconditions.checkState(schema.getOffsetKeyFieldSpec() != null, "offset key not found");
+ _tableNameWithType = TableNameBuilder.ensureTableNameWithType(tableNameWithType,
+ CommonConstants.Helix.TableType.REALTIME);
+ _segmentName = segmentName;
+ _schema = schema;
+ _serverMetrics = serverMetrics;
_keyCoordinatorQueueProducer = KeyCoordinatorProvider.getInstance().getCachedProducer(_tableNameWithType);
- initVirtualColumns();
- }
-
- public String getSegmentName() {
- return _segmentNameStr;
+ if (isMutable) {
+ _indexSegmentCallback = new UpsertMutableIndexSegmentCallback();
+ } else {
+ _indexSegmentCallback = new UpsertImmutableIndexSegmentCallback();
+ }
}
@Override
- public void updateVirtualColumns(List<UpdateLogEntry> messages) {
- ((UpsertSegment) _realtimeSegment).updateVirtualColumn(messages);
+ public synchronized IndexSegmentCallback getIndexSegmentCallback() {
+ return _indexSegmentCallback;
}
@Override
- public String getVirtualColumnInfo(long offset) {
- return ((UpsertSegment) _realtimeSegment).getVirtualColumnInfo(offset);
+ public void processTransformedRow(GenericRow row, long offset) {
+ row.putValue(_schema.getOffsetKey(), offset);
}
@Override
- public void destroy() {
- SegmentUpdater.getInstance().removeSegmentDataManager(_tableNameWithType, _llcSegmentName.getSegmentName(), this);
- super.destroy();
+ public void postIndexProcessing(GenericRow row, long offset) {
+ emitEventToKeyCoordinator(row, offset);
}
@Override
- protected MutableSegmentImpl createMutableSegment(RealtimeSegmentConfig config) {
- return new MutableUpsertSegmentImpl(config);
+ public void postConsumeLoop() {
+ _keyCoordinatorQueueProducer.flush();
}
- @Override
- protected void processTransformedRow(GenericRow row, long offset) {
- row.putField(_schema.getOffsetKey(), offset);
+ public void updateVirtualColumns(List<UpdateLogEntry> messages) {
+ _indexSegmentCallback.updateVirtualColumn(messages);
}
- @Override
- protected void postIndexProcessing(GenericRow row, long offset) {
- emitEventToKeyCoordinator(row, offset);
+ public String getVirtualColumnInfo(long offset) {
+ return _indexSegmentCallback.getVirtualColumnInfo(offset);
}
- @Override
- protected boolean consumeLoop() throws Exception {
- boolean result = super.consumeLoop();
- segmentLogger.info("flushing kafka producer");
- _keyCoordinatorQueueProducer.flush();
- segmentLogger.info("done flushing kafka producer");
- return result;
+ public void destroy() {
+ try {
+ SegmentUpdater.getInstance().removeSegmentDataManager(_tableNameWithType, _segmentName, this);
+ } catch (Exception ex) {
+ LOGGER.error("failed to destroy data manager callback");
+ }
}
/**
@@ -116,7 +113,7 @@ public class UpsertLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataMan
final byte[] primaryKeyBytes = getPrimaryKeyBytesFromRow(row);
final long timestampMillis = getTimestampFromRow(row);
ProduceTask<byte[], KeyCoordinatorQueueMsg> task = new ProduceTask<>(primaryKeyBytes,
- new KeyCoordinatorQueueMsg(primaryKeyBytes, _segmentNameStr, timestampMillis, offset));
+ new KeyCoordinatorQueueMsg(primaryKeyBytes, _segmentName, timestampMillis, offset));
task.setCallback(new ProduceTask.Callback() {
@Override
public void onSuccess() {
@@ -143,11 +140,12 @@ public class UpsertLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataMan
return spec.getIncomingGranularitySpec().toMillis(row.getValue(spec.getIncomingTimeColumnName()));
}
-
- private void initVirtualColumns() throws IOException {
+ @Override
+ public void initVirtualColumns() throws IOException {
// 1. ensure the data manager can capture all update events
// 2. load all existing messages
- SegmentUpdater.getInstance().addSegmentDataManager(_tableNameWithType, _llcSegmentName, this);
- ((UpsertSegment) _realtimeSegment).initVirtualColumn();
+ SegmentUpdater.getInstance().addSegmentDataManager(_tableNameWithType, new LLCSegmentName(_segmentName), this);
+ _indexSegmentCallback.initVirtualColumn();
}
+
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
similarity index 74%
rename from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
index 5ab42f5..1ddc963 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
@@ -16,97 +16,86 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.indexsegment.immutable;
+package org.apache.pinot.core.data.manager.upsert;
+
import com.clearspring.analytics.util.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
import org.apache.pinot.core.io.reader.DataFileReader;
-import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
import org.apache.pinot.core.segment.index.readers.Dictionary;
-import org.apache.pinot.core.segment.store.SegmentDirectory;
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
-import org.apache.pinot.core.startree.v2.store.StarTreeIndexContainer;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
-public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements UpsertSegment {
+public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback {
- private static final Logger LOGGER = LoggerFactory.getLogger(ImmutableUpsertSegmentImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertImmutableIndexSegmentCallback.class);
- private final List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter;
- private final String _tableNameWithType;
- private final String _segmentName;
- private final int _totalDoc;
+ private List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter;
+ private String _tableNameWithType;
+ private String _segmentName;
+ private int _totalDoc;
private long _minSourceOffset;
- private final UpsertWaterMarkManager _upsertWaterMarkManager;
- private final UpdateLogStorageProvider _updateLogStorageProvider;
+ private UpsertWatermarkManager _upsertWatermarkManager;
+ private UpdateLogStorageProvider _updateLogStorageProvider;
// use array for mapping bewteen offset to docId, where actual offset = min_offset + array_index
// use 4 bytes per record
private int[] _sourceOffsetToDocIdArray;
private static final int DEFAULT_DOC_ID_FOR_MISSING_ENTRY = -1;
- public ImmutableUpsertSegmentImpl(SegmentDirectory segmentDirectory,
- SegmentMetadataImpl segmentMetadata,
- Map<String, ColumnIndexContainer> columnIndexContainerMap,
- @Nullable StarTreeIndexContainer starTreeIndexContainer) {
- super(segmentDirectory, segmentMetadata, columnIndexContainerMap, starTreeIndexContainer);
+ public UpsertImmutableIndexSegmentCallback() {}
+
+ @Override
+ public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
Preconditions.checkState(segmentMetadata.getSchema().isTableForUpsert(), "table should be upsert but it is not");
_tableNameWithType = TableNameBuilder.ensureTableNameWithType(segmentMetadata.getTableName(),
CommonConstants.Helix.TableType.REALTIME);
_segmentName = segmentMetadata.getName();
_totalDoc = segmentMetadata.getTotalDocs();
- _upsertWaterMarkManager = UpsertWaterMarkManager.getInstance();
+ _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
_updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
_virtualColumnsReaderWriter = new ArrayList<>();
- for (Map.Entry<String, ColumnIndexContainer> entry: columnIndexContainerMap.entrySet()) {
- String columnName = entry.getKey();
- ColumnIndexContainer container = entry.getValue();
- if (segmentMetadata.getSchema().isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
- _virtualColumnsReaderWriter.add((VirtualColumnLongValueReaderWriter) container.getForwardIndex());
- }
+ for (DataFileReader reader: virtualColumnIndexReader.values()) {
+ _virtualColumnsReaderWriter.add((VirtualColumnLongValueReaderWriter) reader);
}
- buildOffsetToDocIdMap(columnIndexContainerMap.get(segmentMetadata.getSchema().getOffsetKey()));
}
- /** constructor used for creating instance in test cases
- * should not be used for creating regular segment
+ /**
+ * method for testing purpose only
*/
@VisibleForTesting
- protected ImmutableUpsertSegmentImpl(List<VirtualColumnLongValueReaderWriter> readerWriters,
- int totalDoc, UpsertWaterMarkManager manager,
- UpdateLogStorageProvider updateLogStorageProvider,
- long minSourceOffset, int[] offsetToDocId) {
- super(null, null, null, null);
+ protected void init(List<VirtualColumnLongValueReaderWriter> readerWriters,
+ int totalDoc, UpsertWatermarkManager manager,
+ UpdateLogStorageProvider updateLogStorageProvider,
+ long minSourceOffset, int[] offsetToDocId) {
_tableNameWithType = "testTable";
_segmentName = "testSegment";
_virtualColumnsReaderWriter = readerWriters;
_totalDoc = totalDoc;
- _upsertWaterMarkManager = manager;
+ _upsertWatermarkManager = manager;
_updateLogStorageProvider = updateLogStorageProvider;
_minSourceOffset = minSourceOffset;
_sourceOffsetToDocIdArray = offsetToDocId;
-
}
- private void buildOffsetToDocIdMap(ColumnIndexContainer offsetColumnIndexContainer) {
+
+ @Override
+ public void initOffsetColumn(ColumnIndexContainer offsetColumnIndexContainer) {
long start = System.currentTimeMillis();
final DataFileReader reader = offsetColumnIndexContainer.getForwardIndex();
final Dictionary dictionary = offsetColumnIndexContainer.getDictionary();
@@ -143,34 +132,9 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
LOGGER.info("built offset to DocId map for segment {} with {} documents in {} ms", _segmentName, _totalDoc, System.currentTimeMillis() - start);
}
- public static ImmutableUpsertSegmentImpl copyOf(ImmutableSegmentImpl immutableSegment) {
-
- return new ImmutableUpsertSegmentImpl(immutableSegment._segmentDirectory, immutableSegment._segmentMetadata,
- immutableSegment._indexContainerMap, immutableSegment._starTreeIndexContainer);
- }
-
- @Override
- public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
- for (UpdateLogEntry logEntry: logEntries) {
- boolean updated = false;
- int docId = getDocIdFromSourceOffset(logEntry.getOffset());
- for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
- updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
- }
- if (updated) {
- _upsertWaterMarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
- }
- }
- }
-
@Override
- public String getVirtualColumnInfo(long offset) {
- int docId = getDocIdFromSourceOffset(offset);
- StringBuilder result = new StringBuilder("matched: ");
- for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
- result.append(readerWriter.getInt(docId)).append("; ");
- }
- return result.toString();
+ public void postProcessRecords(GenericRow row, int docId) {
+ // do nothing
}
/**
@@ -216,7 +180,7 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
unmatchedLogEntryCount);
}
partitionToHighestWatermark.forEach((partition, value) ->
- _upsertWaterMarkManager.processVersionUpdate(_tableNameWithType, partition, value));
+ _upsertWatermarkManager.processVersionUpdate(_tableNameWithType, partition, value));
} catch (Exception e) {
LOGGER.error("failed to load the offset with thread pool");
@@ -226,6 +190,30 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
_segmentName, System.currentTimeMillis() - start);
}
+ @Override
+ public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+ for (UpdateLogEntry logEntry: logEntries) {
+ boolean updated = false;
+ int docId = getDocIdFromSourceOffset(logEntry.getOffset());
+ for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+ updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
+ }
+ if (updated) {
+ _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
+ }
+ }
+ }
+
+ @Override
+ public String getVirtualColumnInfo(long offset) {
+ int docId = getDocIdFromSourceOffset(offset);
+ StringBuilder result = new StringBuilder("matched: ");
+ for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+ result.append(readerWriter.getInt(docId)).append("; ");
+ }
+ return result.toString();
+ }
+
/**
* given a offset from source kafka topic, return the docId associated with it
* throw exception if there is no docId for the associated kafka offset (because kafka offset might not be continuous)
@@ -246,6 +234,6 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
return _sourceOffsetToDocIdArray[position];
}
}
-
}
+
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
similarity index 76%
rename from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
index a6e7728..eeda5a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
@@ -16,18 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.core.data.manager.upsert;
import com.google.common.base.Preconditions;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.io.reader.DataFileReader;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.grigio.common.messages.LogEventType;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,13 +41,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements UpsertSegment {
+public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback {
- private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
- private final UpsertWaterMarkManager upsertWaterMarkManager;
-
- private final String _kafkaOffsetColumnName;
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertMutableIndexSegmentCallback.class);
+ private String _tableName;
+ private String _segmentName;
+ private Schema _schema;
+ private UpsertWatermarkManager _upsertWatermarkManager;
+ private String _offsetColumnName;
private final List<VirtualColumnLongValueReaderWriter> _mutableSegmentReaderWriters = new ArrayList<>();
// use map for mapping between kafka offset and docId because we at-most have 1 mutable segment per consumer
// will use more memory, but we can update this later
@@ -54,23 +59,31 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
private final Map<Long, UpdateLogEntry> _unmatchedInsertRecords = new ConcurrentHashMap<>();
private final Map<Long, UpdateLogEntry> _unmatchedDeleteRecords = new ConcurrentHashMap<>();
- public MutableUpsertSegmentImpl(RealtimeSegmentConfig config) {
- super(config);
- _kafkaOffsetColumnName = _schema.getOffsetKey();
+ @Override
+ public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
+ _tableName = TableNameBuilder.ensureTableNameWithType(segmentMetadata.getTableName(),
+ CommonConstants.Helix.TableType.REALTIME);
+ _segmentName = segmentMetadata.getName();
+ _schema = segmentMetadata.getSchema();
+ _offsetColumnName = _schema.getOffsetKey();
Preconditions.checkState(_schema.isTableForUpsert(), "table should be upsert");
- for (Map.Entry<String, DataFileReader> entry: _virtualColumnIndexReader.entrySet()) {
+ for (Map.Entry<String, DataFileReader> entry: virtualColumnIndexReader.entrySet()) {
String column = entry.getKey();
DataFileReader reader = entry.getValue();
if (reader instanceof VirtualColumnLongValueReaderWriter) {
- _logger.info("adding virtual column {} to updatable reader writer list", column);
+ LOGGER.info("adding virtual column {} to updatable reader writer list", column);
_mutableSegmentReaderWriters.add((VirtualColumnLongValueReaderWriter) reader);
}
}
- upsertWaterMarkManager = UpsertWaterMarkManager.getInstance();
+ _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
LOGGER.info("starting upsert segment with {} reader writer", _mutableSegmentReaderWriters.size());
}
@Override
+ public void initOffsetColumn(ColumnIndexContainer offsetColumnContainer) {
+ // do nothing
+ }
+
public synchronized void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
for (UpdateLogEntry logEntry: logEntries) {
boolean updated = false;
@@ -83,7 +96,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
}
if (updated) {
// only update high water mark if it indeed updated something
- upsertWaterMarkManager.processMessage(_tableName, _segmentName, logEntry);
+ _upsertWatermarkManager.processMessage(_tableName, _segmentName, logEntry);
}
}
if (!offsetFound) {
@@ -92,7 +105,6 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
}
}
- @Override
public String getVirtualColumnInfo(long offset) {
Integer docId = _sourceOffsetToDocId.get(offset);
StringBuilder result = new StringBuilder("matched: ");
@@ -113,8 +125,8 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
}
@Override
- protected synchronized void postProcessRecords(GenericRow row, int docId) {
- final Long offset = (Long) row.getValue(_kafkaOffsetColumnName);
+ public synchronized void postProcessRecords(GenericRow row, int docId) {
+ final Long offset = (Long) row.getValue(_offsetColumnName);
for (VirtualColumnLongValueReaderWriter readerWriter: _mutableSegmentReaderWriters) {
readerWriter.addNewRecord(docId);
}
@@ -123,9 +135,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
checkForOutstandingRecords(_unmatchedInsertRecords, offset, docId);
}
- @Override
public void initVirtualColumn() throws IOException {
- Preconditions.checkState(_numDocsIndexed == 0, "should init virtual column before ingestion");
UpdateLogEntrySet updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableName, _segmentName);
LOGGER.info("got {} update log entries for current segment {}", updateLogEntries.size(), _segmentName);
// some physical data might have been ingested when we init virtual column, we will go through the normal update
@@ -133,6 +143,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
updateVirtualColumn(updateLogEntries);
}
+
private void checkForOutstandingRecords(Map<Long, UpdateLogEntry> unmatchRecordsMap, Long offset, int docId) {
UpdateLogEntry unmatchedEntry = unmatchRecordsMap.remove(offset);
if (unmatchedEntry != null) {
@@ -141,7 +152,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
updated = readerWriter.update(docId, unmatchedEntry.getValue(), unmatchedEntry.getType()) || updated;
}
if (updated) {
- upsertWaterMarkManager.processMessage(_tableName, _segmentName, unmatchedEntry);
+ _upsertWatermarkManager.processMessage(_tableName, _segmentName, unmatchedEntry);
}
}
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
new file mode 100644
index 0000000..dc329e4
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallback {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableDataManagerCallbackImpl.class);
+
+ private UpdateLogStorageProvider _updateLogStorageProvider;
+
+ @Override
+ public void init() {
+ _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
+ }
+
+ @Override
+ public void addSegment(String tableName, String segmentName, TableConfig tableConfig) {
+ try {
+ _updateLogStorageProvider.addSegment(tableName, segmentName);
+ } catch (IOException e) {
+ LOGGER.error("failed to add update log for segment {} {}", tableName, segmentName);
+ ExceptionUtils.rethrow(e);
+ }
+ }
+
+ @Override
+ public DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
+ ServerMetrics serverMetrics, boolean isMutable) {
+ return new UpsertDataManagerCallbackImpl(tableName, segmentName, schema, serverMetrics, isMutable);
+ }
+
+ @Override
+ public DataManagerCallback getDefaultDataManagerCallback() {
+ throw new NotImplementedException("cannot create DefaultDataManagerCallback from UpsertTableDataManagerCallback");
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
similarity index 94%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
index d0d8c0e..530cd0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
@@ -26,7 +26,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
import org.apache.pinot.grigio.common.messages.LogCoordinatorMessage;
import org.apache.pinot.grigio.common.metrics.GrigioMeter;
import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
@@ -75,7 +75,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
private final String _topicPrefix;
private final ExecutorService _ingestionExecutorService;
private final QueueConsumer _consumer;
- private final Map<String, Map<String, Set<UpsertSegmentDataManager>>> _tableSegmentMap = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>();
private final Map<String, Map<Integer, Long>> _tablePartitionCreationTime = new ConcurrentHashMap<>();
private final UpdateLogStorageProvider _updateLogStorageProvider;
private final UpdateLogRetentionManager _retentionManager;
@@ -91,7 +91,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
_topicPrefix = conf.getString(SegmentUpdaterConfig.INPUT_TOPIC_PREFIX);
_updateSleepMs = conf.getInt(SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS,
SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS_DEFAULT);
- UpsertWaterMarkManager.init(metrics);
+ UpsertWatermarkManager.init(metrics);
_consumer = provider.getConsumer();
_ingestionExecutorService = Executors.newFixedThreadPool(1);
_updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
@@ -163,7 +163,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
String tableName = TableNameBuilder.ensureTableNameWithType(entry.getKey(), CommonConstants.Helix.TableType.REALTIME);
int tableMessageCount = 0;
if (_tableSegmentMap.containsKey(tableName)) {
- final Map<String, Set<UpsertSegmentDataManager>> segmentManagersMap = _tableSegmentMap.get(tableName);
+ final Map<String, Set<DataManagerCallback>> segmentManagersMap = _tableSegmentMap.get(tableName);
final TableUpdateLogs segment2UpdateLogsMap = entry.getValue();
updateSegmentVirtualColumns(tableName, segmentManagersMap, segment2UpdateLogsMap, timeToStoreUpdateLogs);
} else {
@@ -179,7 +179,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
if (System.currentTimeMillis() - lastReportedTime > LOGGER_TIME_GAP_MS) {
lastReportedTime = System.currentTimeMillis();
LOGGER.info("processed {} messages in {} ms", eventCount, System.currentTimeMillis() - loopStartTime);
- LOGGER.info("latest high water mark is {}", UpsertWaterMarkManager.getInstance().toString());
+ LOGGER.info("latest high water mark is {}", UpsertWatermarkManager.getInstance().toString());
}
_consumer.ackOffset();
_metrics.addTimedValueMs(GrigioTimer.SEGMENT_UPDATER_LOOP_TIME, System.currentTimeMillis() - loopStartTime);
@@ -202,7 +202,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
/**
* Update the virtual columns of affected segments of a table.
*/
- private void updateSegmentVirtualColumns(String tableName, Map<String, Set<UpsertSegmentDataManager>> segmentManagersMap,
+ private void updateSegmentVirtualColumns(String tableName, Map<String, Set<DataManagerCallback>> segmentManagersMap,
TableUpdateLogs segment2UpdateLogsMap, AtomicLong timeToStoreUpdateLogs) throws IOException{
for (Map.Entry<String, List<UpdateLogEntry>> segmentEntry : segment2UpdateLogsMap.getSegments2UpdateLog().entrySet()) {
final String segmentNameStr = segmentEntry.getKey();
@@ -217,7 +217,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
* from consuming to online (mutable segment to immutable segment). In most of cases we expect only one segment manager
* in this set of UpsertSegmentDataManager
*/
- private void updateVirtualColumn(String table, String segment, Set<UpsertSegmentDataManager> segmentDataManagers,
+ private void updateVirtualColumn(String table, String segment, Set<DataManagerCallback> segmentDataManagers,
List<UpdateLogEntry> messages, AtomicLong timeToStoreUpdateLogs) throws IOException {
LOGGER.debug("updating segment {} with {} results for {} data managers", segment, messages.size(),
segmentDataManagers.size());
@@ -225,7 +225,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
storeUpdateLogs(table, segment, messages, timeToStoreUpdateLogs);
}
try {
- for (UpsertSegmentDataManager dataManager: segmentDataManagers) {
+ for (DataManagerCallback dataManager: segmentDataManagers) {
dataManager.updateVirtualColumns(messages);
}
} catch (Exception ex) {
@@ -239,7 +239,8 @@ public class SegmentUpdater implements SegmentDeletionListener {
* @param segmentName
* @param dataManager the data manager for the current given table/segment combination
*/
- public synchronized void addSegmentDataManager(String tableNameWithType, LLCSegmentName segmentName, UpsertSegmentDataManager dataManager) {
+ public synchronized void addSegmentDataManager(String tableNameWithType, LLCSegmentName segmentName,
+ DataManagerCallback dataManager) {
// TODO get partition assignment from
LOGGER.info("segment updater adding table {} segment {}", tableNameWithType, segmentName.getSegmentName());
if (!_tableSegmentMap.containsKey(tableNameWithType)) {
@@ -258,11 +259,12 @@ public class SegmentUpdater implements SegmentDeletionListener {
}
}
- public synchronized void removeSegmentDataManager(String tableNameWithType, String segmentName, UpsertSegmentDataManager toDeleteManager) {
+ public synchronized void removeSegmentDataManager(String tableNameWithType, String segmentName,
+ DataManagerCallback toDeleteManager) {
LOGGER.info("segment updater removing table {} segment {}", tableNameWithType, segmentName);
- Map<String, Set<UpsertSegmentDataManager>> segmentMap = _tableSegmentMap.get(tableNameWithType);
+ Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableNameWithType);
if (segmentMap != null) {
- Set<UpsertSegmentDataManager> segmentDataManagers = segmentMap.get(segmentName);
+ Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName);
if (segmentDataManagers != null) {
segmentDataManagers.remove(toDeleteManager);
if (segmentDataManagers.size() == 0) {
@@ -298,7 +300,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
@Override
public synchronized void onSegmentDeletion(String tableNameWithType, String segmentName) {
LOGGER.info("deleting segment virtual column from local storage for table {} segment {}", tableNameWithType, segmentName);
- Map<String, Set<UpsertSegmentDataManager>> segmentManagerMap = _tableSegmentMap.get(tableNameWithType);
+ Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableNameWithType);
if (segmentManagerMap != null) {
if (segmentManagerMap.containsKey(segmentName) && segmentManagerMap.get(segmentName).size() > 0) {
LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
similarity index 100%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
copy to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
similarity index 86%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
index bbea037..faad27b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.updater;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import org.apache.commons.configuration.Configuration;
import org.apache.pinot.grigio.common.metrics.GrigioGauge;
import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -30,26 +31,26 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class UpsertWaterMarkManager {
+public class UpsertWatermarkManager implements WatermarkManager {
private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>();
private final GrigioMetrics _metrics;
- private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWaterMarkManager.class);
- private static volatile UpsertWaterMarkManager _instance;
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWatermarkManager.class);
+ private static volatile UpsertWatermarkManager _instance;
- private UpsertWaterMarkManager(GrigioMetrics metrics) {
+ private UpsertWatermarkManager(GrigioMetrics metrics) {
_metrics = metrics;
}
public static void init(GrigioMetrics metrics) {
- synchronized (UpsertWaterMarkManager.class) {
+ synchronized (UpsertWatermarkManager.class) {
Preconditions.checkState(_instance == null, "upsert water mark manager is already init");
- _instance = new UpsertWaterMarkManager(metrics);
+ _instance = new UpsertWatermarkManager(metrics);
}
}
- public static UpsertWaterMarkManager getInstance() {
+ public static UpsertWatermarkManager getInstance() {
Preconditions.checkState(_instance != null, "upsert water mark manager is not yet init");
return _instance;
}
@@ -79,6 +80,11 @@ public class UpsertWaterMarkManager {
return ImmutableMap.copyOf(_highWaterMarkTablePartitionMap.getOrDefault(tableName, ImmutableMap.of()));
}
+ @Override
+ public void init(Configuration config, GrigioMetrics metrics) {
+
+ }
+
public Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap() {
return ImmutableMap.copyOf(_highWaterMarkTablePartitionMap);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
similarity index 89%
rename from pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
index 817131c..979e5c1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.indexsegment.immutable;
+package org.apache.pinot.core.data.manager.upsert;
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.grigio.common.messages.LogEventType;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -38,16 +38,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class ImmutableUpsertSegmentImplTest {
-
+public class UpsertImmutableIndexSegmentCallbackTest {
UpdateLogStorageProvider _mockProvider;
- UpsertWaterMarkManager _mockUpsertWaterMarkManager;
+ UpsertWatermarkManager _mockUpsertWatermarkManager;
List<VirtualColumnLongValueReaderWriter> _readerWriters = new ArrayList<>();
@BeforeMethod
public void init() {
_mockProvider = mock(UpdateLogStorageProvider.class);
- _mockUpsertWaterMarkManager = mock(UpsertWaterMarkManager.class);
+ _mockUpsertWatermarkManager = mock(UpsertWatermarkManager.class);
}
@Test
@@ -105,10 +104,10 @@ public class ImmutableUpsertSegmentImplTest {
start = System.currentTimeMillis();
- ImmutableUpsertSegmentImpl immutableUpsertSegment = new ImmutableUpsertSegmentImpl(_readerWriters, totalDocs,
- _mockUpsertWaterMarkManager, _mockProvider, minOffset, offsetToDocId);
+ UpsertImmutableIndexSegmentCallback callback = new UpsertImmutableIndexSegmentCallback();
+ callback.init(_readerWriters, totalDocs, _mockUpsertWatermarkManager, _mockProvider, minOffset, offsetToDocId);
+ callback.initVirtualColumn();
- immutableUpsertSegment.initVirtualColumn();
long runtime = System.currentTimeMillis() - start;
System.out.println("run time is " + runtime);
// on regular developer laptop this should take less 1 second, but on integration server this might be longer
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pom.xml
index e53fa72..47ad035 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>pinot-grigio-common</module>
<module>pinot-grigio-coordinator</module>
+ <module>pinot-grigio-provided</module>
</modules>
<properties>
<pinot.root>${basedir}/..</pinot.root>
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 1af7e29..920b422 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -41,11 +41,11 @@ import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.segment.index.loader.LoaderUtils;
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -219,12 +219,18 @@ public class HelixInstanceDataManager implements InstanceDataManager {
// Copy from segment backup directory back to index directory
FileUtils.copyDirectory(segmentBackupDir, indexDir);
+ final TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+ final DataManagerCallback dataManagerCallback = tableDataManager.getTableDataManagerCallback()
+ .getDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics, false);
+
// Load from index directory
- ImmutableSegment immutableSegment = ImmutableSegmentLoader
- .load(indexDir, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), schema);
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir,
+ new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig),
+ dataManagerCallback,
+ schema);
// Replace the old segment in memory
- _tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment);
+ _tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment, dataManagerCallback);
// Rename segment backup directory to segment temporary directory (atomic)
// The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
@@ -301,10 +307,12 @@ public class HelixInstanceDataManager implements InstanceDataManager {
}
}
+ /*
@Override
public Map<String, Map<Integer, Long>> getLowWaterMarks() {
return UpsertWaterMarkManager.getInstance().getHighWaterMarkTablePartitionMap();
}
+ */
@Override
public String getSegmentDataDirectory() {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index f46023d..c1ea182 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -64,6 +64,8 @@ import org.apache.pinot.server.conf.ServerConf;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.upsert.UpsertComponentContainer;
+import org.apache.pinot.server.upsert.UpsertComponentContainerProvider;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 910a6b9..145e378 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.server.upsert.SegmentDeletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,12 +54,19 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
private final String _instanceId;
private final InstanceDataManager _instanceDataManager;
private final SegmentFetcherAndLoader _fetcherAndLoader;
+ private final SegmentDeletionHandler _segmentDeletionHandler;
public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
SegmentFetcherAndLoader fetcherAndLoader) {
+ this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler());
+ }
+
+ public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
+ SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler segmentDeletionHandler) {
_instanceId = instanceId;
_instanceDataManager = instanceDataManager;
_fetcherAndLoader = fetcherAndLoader;
+ _segmentDeletionHandler = segmentDeletionHandler;
}
public static String getStateModelName() {
@@ -203,6 +211,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
FileUtils.deleteQuietly(segmentDir);
_logger.info("Deleted segment directory {}", segmentDir);
}
+ _segmentDeletionHandler.deleteSegmentFromLocalStorage(tableNameWithType, segmentName);
} catch (final Exception e) {
_logger.error("Cannot delete the segment : " + segmentName + " from local directory!\n" + e.getMessage(), e);
Utils.rethrowException(e);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
similarity index 57%
rename from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
rename to pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
index 1c9effc..63424ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
@@ -16,22 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.server.upsert;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.configuration.Configuration;
-public class MutableAppendSegmentImpl extends MutableSegmentImpl {
- private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
+public class DefaultUpsertComponentContainer implements UpsertComponentContainer {
- public MutableAppendSegmentImpl(RealtimeSegmentConfig config) {
- super(config);
+ private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler();
+
+ @Override
+ public void registerMetrics(MetricRegistry registry) {
+
+ }
+
+ @Override
+ public void init(Configuration config) {
+ }
+
+ @Override
+ public SegmentDeletionHandler getSegmentDeletionHandler() {
+ return deletionHandler;
+ }
+
+ @Override
+ public void start() {
}
@Override
- protected void postProcessRecords(GenericRow row, int docId) {
- // nothing
+ public void stop() {
}
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
new file mode 100644
index 0000000..1aba06b
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.upsert;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.pinot.core.segment.updater.SegmentDeletionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SegmentDeletionHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDeletionHandler.class);
+
+ private List<SegmentDeletionListener> _deleteListeners;
+
+
+ public SegmentDeletionHandler() {
+ this(ImmutableList.of());
+ }
+
+ public SegmentDeletionHandler(List<SegmentDeletionListener> listeners) {
+ _deleteListeners = listeners;
+ }
+
+ public void deleteSegmentFromLocalStorage(String tableNameWithType, String segmentName) {
+ //TODO move the deletion from file system logic to here
+ LOGGER.info("trying to perform deletion for {}:{} on {} deletion handler", tableNameWithType, segmentName,
+ _deleteListeners.size());
+ for (SegmentDeletionListener deletionListener : _deleteListeners) {
+ try {
+ deletionListener.onSegmentDeletion(tableNameWithType, segmentName);
+ } catch (Exception ex) {
+ LOGGER.error("failed to delete table {} segment {} with exception", tableNameWithType, segmentName, ex);
+ }
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
similarity index 69%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
rename to pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
index 97d48f3..7f636fc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
@@ -16,10 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.updater;
+package org.apache.pinot.server.upsert;
-public class SegmentUpdaterConfig {
- public static final String SEGMENT_UDPATE_SLEEP_MS = "sleep.ms";
- public static final String INPUT_TOPIC_PREFIX = "input.topic.prefix";
- public static final int SEGMENT_UDPATE_SLEEP_MS_DEFAULT = 100;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.configuration.Configuration;
+
+public interface UpsertComponentContainer {
+
+ void registerMetrics(MetricRegistry registry);
+
+ void init(Configuration config);
+
+ SegmentDeletionHandler getSegmentDeletionHandler();
+
+ void start();
+
+ void stop();
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
new file mode 100644
index 0000000..fe4af07
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.upsert;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.core.segment.updater.WatermarkManager;
+import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpsertComponentContainerProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class);
+
+ public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class";
+ public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
+
+ private final Configuration _conf;
+ private UpsertComponentContainer _instance;
+
+ public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) {
+ _conf = conf;
+ String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+ LOGGER.info("creating watermark manager with class {}", className);
+ try {
+ Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
+ Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class),
+ "configured class not assignable from Callback class");
+ _instance = comonentContainerClass.newInstance();
+ _instance.init(_conf);
+ } catch (Exception e) {
+ LOGGER.error("failed to load watermark manager class", className, e);
+ ExceptionUtils.rethrow(e);
+ }
+ }
+
+ public UpsertComponentContainer getInstance() {
+ return _instance;
+ }
+}
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 08ab8c3..60702c4 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
@@ -122,7 +123,7 @@ public abstract class BaseResourceTest {
ImmutableSegment immutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap);
_indexSegments.add(immutableSegment);
- _tableDataManagerMap.get(TABLE_NAME).addSegment(immutableSegment);
+ _tableDataManagerMap.get(TABLE_NAME).addSegment(immutableSegment, DefaultDataManagerCallbackImpl.INSTANCE);
return immutableSegment;
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 187b36f..ac989bb 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -26,7 +26,8 @@ import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.core.indexsegment.mutable.MutableAppendSegmentImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.spi.utils.DataSize;
@@ -136,7 +137,8 @@ public class MemoryEstimator {
.setStatsHistory(sampleStatsHistory);
// create mutable segment impl
- MutableSegmentImpl mutableSegmentImpl = new MutableAppendSegmentImpl(realtimeSegmentConfigBuilder.build());
+ MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(),
+ DefaultIndexSegmentCallback.INSTANCE);
// read all rows and index them
try (PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_sampleCompletedSegment);) {
@@ -236,7 +238,8 @@ public class MemoryEstimator {
.setOffHeap(true).setMemoryManager(memoryManager).setStatsHistory(statsHistory);
// create mutable segment impl
- MutableSegmentImpl mutableSegmentImpl = new MutableAppendSegmentImpl(realtimeSegmentConfigBuilder.build());
+ MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(),
+ DefaultIndexSegmentCallback.INSTANCE);
long memoryForConsumingSegmentPerPartition = memoryManager.getTotalAllocatedBytes();
mutableSegmentImpl.destroy();
FileUtils.deleteQuietly(statsFileCopy);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org