You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/05/15 08:07:08 UTC

[GitHub] [incubator-pinot] jamesyfshao opened a new pull request #5394: add callback interface for upsert component

jamesyfshao opened a new pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394


   added list of critical interface that we need to inject to pinot core/server to ensure we can add proper logic for upsert-enabled pinotserver and tables and making sure existing pinot tables perform the same as before. Pinot upsert related logics will be implemented in separate package and injected to runtime environment via config. This diff also provided a no-op implementation to ensure that existing pinot ingestion workflow stay the same
   
   design doc for pinot upsert feature: [google doc](https://docs.google.com/document/d/1SFFir7ByxCff-aVYxQeTHpNhPXeP5q7P4g_6O2iNGgU/edit?usp=sharing) (our corp privacy policy disallow blanket sharing so please send a view/comment request to me and I usually accept it as soon as I see it)
   
   this change focus on the changes in pinot core/server


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-657650284


   @jamesyfshao can we take the discussion to the design doc? I had asked a few questions there, but they are not resolved yet. Please see my comments on the "New Interface" section. 
   
    https://docs.google.com/document/d/1SFFir7ByxCff-aVYxQeTHpNhPXeP5q7P4g_6O2iNGgU/edit
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-637721377


   > Hey James, the last we discussed, I thought the only callback needed was after the realtime row was indexed.
   > 
   > Can you add to your design doc the following information
   > 
   > 1. At what points do you need callbacks from Pinot
   > 2. What does the callback do?
   > 
   > For example:
   > Callbacks Needed:
   > 
   > 1. onRowIndexed(): This callback needs to happen each time a row ingested from a realtime stream-partition is successfully indexed. upon receiving this callback, the pinot-upsert pushes an event into Kafka (see xxx of design doc). Arguments to this callback should be:
   >    a.  Offset of the row
   >    b. Table name
   >    c. Segment name
   >    d. : whatever...
   > 2. onSegmentLoad(): This callback needs to happen each time an offline segment is loaded successfully. ... etc.
   > 
   > I know this is probably spread across your design doc, but to have it concisely in one place will help us review.
   > 
   > thanks a lot, and sorry for the delay in review. I had a hard deadline of May 31 on an internal project that I have been working on.
   
   thanks for reviewing, I have added a session [here](https://docs.google.com/document/d/1SFFir7ByxCff-aVYxQeTHpNhPXeP5q7P4g_6O2iNGgU/edit#heading=h.5g392vjhtywt). Also all the current callback are contained in this [package](https://github.com/jamesyfshao/pinot/tree/ecbef927507edd4239682aab3b0ef0f65a351a78/pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback). If you have time to apply the patch to our local pinot repo it will be more straight forward and the method itself is also documnted
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-652836506


   > @jamesyfshao is there anything pending here. Let's get this in.
   
   waiting for @mcvsubbu to give the diff another pass to see if my update has addressed the concerns


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446716088



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/IndexSegmentCallback.java
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.Map;
+
+/**
+ * Component inject to {@link IndexSegment} for handling extra logic for
+ * upsert-enabled pinot ingestion mode
+ */
+@InterfaceStability.Evolving
+public interface IndexSegmentCallback {
+
+  /**
+   * Initialize the callback from {@link IndexSegment}. This happens in constructor of IndexSegment implementation class

Review comment:
       Updated method names in this callback interface correspondingly as well.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/IndexSegmentCallback.java
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.Map;
+
+/**
+ * Component inject to {@link IndexSegment} for handling extra logic for
+ * upsert-enabled pinot ingestion mode
+ */
+@InterfaceStability.Evolving
+public interface IndexSegmentCallback {
+
+  /**
+   * Initialize the callback from {@link IndexSegment}. This happens in constructor of IndexSegment implementation class
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will initialize the necessary virtual columns for upsert table
+   * (offset mapping, $validFrom, $validUntil columns)
+   *
+   * this method ensure that virtual columns index (forward/reverted index) can be created properly and load
+   * any existing data for the virtual column from local storage
+   *
+   * @param segmentMetadata the metadata associated with the current segment
+   * @param columnIndexContainerMap mapping of necessary column name and the associate columnIndexContainer
+   */
+  void init(SegmentMetadata segmentMetadata, Map<String, ColumnIndexContainer> columnIndexContainerMap);
+
+  /**
+   * Perform operation from the callback for the given row after it has been processed and index.
+   * This method happens after indexing finished in MutableSegmentImpl.index(GenericRow, RowMetadata) method
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will build offset mapping and update virtual columns for the column if necessary
+   *
+   * This method is similar to
+   * {@link org.apache.pinot.core.data.manager.callback.DataManagerCallback#postIndexProcessing(GenericRow, StreamPartitionMsgOffset)}
+   * However, the other method don't have information to docID and it is necessary for upsert virtual columns
+   * to have these information to build the forward index and offset columns to build mapping between offset -> docId
+   * we also might need to update virtual column forward-index for the newly ingested row
+   *
+   * @param row the current pinot row we just indexed into the current IndexSegment
+   * @param docId the docId of this record
+   */
+  void postProcessRecords(GenericRow row, int docId);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r443148746



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;

Review comment:
       Please document if this method is to be called once during system startup, or once for every table.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {

Review comment:
       the interface definition should go into pinot-spi

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  /**
+   * process the row after transformation in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the GenericRow has been transformed by RecordTransformer and before it is indexed by
+   * IndexSegmentImpl, to ensure we can provide other necessary data to the segment metadata.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will attach the offset object into the GenericRow object.
+   *
+   * The reason we need this particular logic is that in upsert table, we need to add offset data to the physical data
+   * this will help us to apply the update events from key coordinator to upsert table correctly as the offset
+   * is used as the index to identify which record's virtual column we want to update
+   *
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular pinot record
+   */
+  void processTransformedRow(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * process the row after indexing in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the MutableSegmentImpl has done the indexing of the current row in its physical storage
+   *
+   * In append-tables callback, this method will do nothing

Review comment:
       All realtime tables are append tables, so let us keep the terminology right. We can remove this comment, or restate it to say that the call needs to happen only if Upsert is ON for the table. 

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/IndexSegmentCallback.java
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.Map;
+
+/**
+ * Component inject to {@link IndexSegment} for handling extra logic for
+ * upsert-enabled pinot ingestion mode
+ */
+@InterfaceStability.Evolving
+public interface IndexSegmentCallback {
+
+  /**
+   * Initialize the callback from {@link IndexSegment}. This happens in constructor of IndexSegment implementation class
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will initialize the necessary virtual columns for upsert table
+   * (offset mapping, $validFrom, $validUntil columns)
+   *
+   * this method ensure that virtual columns index (forward/reverted index) can be created properly and load
+   * any existing data for the virtual column from local storage
+   *
+   * @param segmentMetadata the metadata associated with the current segment
+   * @param columnIndexContainerMap mapping of necessary column name and the associate columnIndexContainer
+   */
+  void init(SegmentMetadata segmentMetadata, Map<String, ColumnIndexContainer> columnIndexContainerMap);
+
+  /**
+   * Perform operation from the callback for the given row after it has been processed and index.
+   * This method happens after indexing finished in MutableSegmentImpl.index(GenericRow, RowMetadata) method
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will build offset mapping and update virtual columns for the column if necessary
+   *
+   * This method is similar to
+   * {@link org.apache.pinot.core.data.manager.callback.DataManagerCallback#postIndexProcessing(GenericRow, StreamPartitionMsgOffset)}
+   * However, the other method don't have information to docID and it is necessary for upsert virtual columns
+   * to have these information to build the forward index and offset columns to build mapping between offset -> docId
+   * we also might need to update virtual column forward-index for the newly ingested row
+   *
+   * @param row the current pinot row we just indexed into the current IndexSegment
+   * @param docId the docId of this record
+   */
+  void postProcessRecords(GenericRow row, int docId);
+
+  /**
+   * Retrieve information related to an upsert-enable segment virtual column for debug purpose
+   *
+   * @param offset the offset of the record we are trying to get the virtual columnn data for
+   * @return string representation of the virtual column data information
+   */
+  String getVirtualColumnInfo(StreamPartitionMsgOffset offset);

Review comment:
       You should get the schema, tableconfig etc via the init(), so I am not sure why we need this vitual columninfo call. Also, why should Pinot code call this method? Let us remove the debug calls for now, and we can add back things we need once we have the overall setup right.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/IndexSegmentCallback.java
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.Map;
+
+/**
+ * Component inject to {@link IndexSegment} for handling extra logic for
+ * upsert-enabled pinot ingestion mode
+ */
+@InterfaceStability.Evolving
+public interface IndexSegmentCallback {
+
+  /**
+   * Initialize the callback from {@link IndexSegment}. This happens in constructor of IndexSegment implementation class
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will initialize the necessary virtual columns for upsert table
+   * (offset mapping, $validFrom, $validUntil columns)
+   *
+   * this method ensure that virtual columns index (forward/reverted index) can be created properly and load
+   * any existing data for the virtual column from local storage
+   *
+   * @param segmentMetadata the metadata associated with the current segment
+   * @param columnIndexContainerMap mapping of necessary column name and the associate columnIndexContainer
+   */
+  void init(SegmentMetadata segmentMetadata, Map<String, ColumnIndexContainer> columnIndexContainerMap);
+
+  /**
+   * Perform operation from the callback for the given row after it has been processed and index.
+   * This method happens after indexing finished in MutableSegmentImpl.index(GenericRow, RowMetadata) method
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will build offset mapping and update virtual columns for the column if necessary
+   *
+   * This method is similar to
+   * {@link org.apache.pinot.core.data.manager.callback.DataManagerCallback#postIndexProcessing(GenericRow, StreamPartitionMsgOffset)}
+   * However, the other method don't have information to docID and it is necessary for upsert virtual columns
+   * to have these information to build the forward index and offset columns to build mapping between offset -> docId
+   * we also might need to update virtual column forward-index for the newly ingested row
+   *
+   * @param row the current pinot row we just indexed into the current IndexSegment
+   * @param docId the docId of this record
+   */
+  void postProcessRecords(GenericRow row, int docId);

Review comment:
       We should make ONE callback with whatever information needed.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  /**
+   * process the row after transformation in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the GenericRow has been transformed by RecordTransformer and before it is indexed by
+   * IndexSegmentImpl, to ensure we can provide other necessary data to the segment metadata.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will attach the offset object into the GenericRow object.
+   *
+   * The reason we need this particular logic is that in upsert table, we need to add offset data to the physical data
+   * this will help us to apply the update events from key coordinator to upsert table correctly as the offset
+   * is used as the index to identify which record's virtual column we want to update
+   *
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular pinot record
+   */
+  void processTransformedRow(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * process the row after indexing in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the MutableSegmentImpl has done the indexing of the current row in its physical storage
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will emit an event to the message queue that will deliver the event to
+   * key coordinator.
+   *
+   * This method ensures that we can emit the metadata for an new entry that pinot just indexed to its internal storage
+   * and let key coordinator to be able to consume those events to process the updates correctly
+   *
+   * @param row the row we just index in the current segment
+   * @param offset the offset associated with the current row
+   */
+  void postIndexProcessing(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * perform any necessary finalize operation after the consumer loop finished in LLRealtimeSegmentDataManager.consumeLoop(...)
+   * method. It happens after the consumerloop exits the loop and reached the end criteria.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will flush the queue producer to ensure all pending messages are deliverd
+   * to the queue between pinot server and pinot key-coordinator
+   *
+   * this method will ensure that pinot server can send all events to key coordinator eventually before a segment
+   * is committed. If this does not happen we might lose data in case of machine failure.
+   */
+  void postConsumeLoop();
+
+  /**
+   * perform any necessary clean up operation when the SegmentDataManager called its destroy() method.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will notify segmentUpdater to remove any registered reference for this
+   * dataManagerCallback.
+   *
+   * this method will ensure that segmentUpdater can keep track of which dataManager is still alive in the current pinot
+   * server so it can dispatch appropriate update events to only the alive pinot data manager
+   */
+  void destroy();

Review comment:
       onSegmentDestroy().
   Also, you can use this callback for the previous one. Or, just key off of previous one.
   
   If I understand your proposal better, you are also getting a callback when the IndexSegment is loaded. In that case, you can remove this call as well as the previous one, and just use the callback when the index segment is loaded. It will also serve the use case when the segment goes from OFFLINE to ONLINE without going through CONSUMING state.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/IndexSegmentCallback.java
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.Map;
+
+/**
+ * Component inject to {@link IndexSegment} for handling extra logic for
+ * upsert-enabled pinot ingestion mode
+ */
+@InterfaceStability.Evolving
+public interface IndexSegmentCallback {
+
+  /**
+   * Initialize the callback from {@link IndexSegment}. This happens in constructor of IndexSegment implementation class

Review comment:
       You should not indicate which class should call this method. Instead, indicate on what condition you want this call to happen. Is it before the segment is loaded succesfsully? Or after the segment is loaded successfully ? If you start the name of the method as "on" then we can get the answer.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/IndexSegmentCallback.java
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.Map;
+
+/**
+ * Component inject to {@link IndexSegment} for handling extra logic for
+ * upsert-enabled pinot ingestion mode
+ */
+@InterfaceStability.Evolving
+public interface IndexSegmentCallback {
+
+  /**
+   * Initialize the callback from {@link IndexSegment}. This happens in constructor of IndexSegment implementation class
+   *
+   * In append-tables callback, this method will do nothing

Review comment:
       same comment as before, will not repeat.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();

Review comment:
       It is not clear why we are returning a callback object from within another callback object, but let us get the coarse details right and let me understand the architecture before I propose anything.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  /**
+   * process the row after transformation in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the GenericRow has been transformed by RecordTransformer and before it is indexed by
+   * IndexSegmentImpl, to ensure we can provide other necessary data to the segment metadata.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will attach the offset object into the GenericRow object.
+   *
+   * The reason we need this particular logic is that in upsert table, we need to add offset data to the physical data
+   * this will help us to apply the update events from key coordinator to upsert table correctly as the offset
+   * is used as the index to identify which record's virtual column we want to update
+   *
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular pinot record
+   */
+  void processTransformedRow(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * process the row after indexing in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the MutableSegmentImpl has done the indexing of the current row in its physical storage
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will emit an event to the message queue that will deliver the event to
+   * key coordinator.
+   *
+   * This method ensures that we can emit the metadata for an new entry that pinot just indexed to its internal storage
+   * and let key coordinator to be able to consume those events to process the updates correctly
+   *
+   * @param row the row we just index in the current segment
+   * @param offset the offset associated with the current row
+   */
+  void postIndexProcessing(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * perform any necessary finalize operation after the consumer loop finished in LLRealtimeSegmentDataManager.consumeLoop(...)
+   * method. It happens after the consumerloop exits the loop and reached the end criteria.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will flush the queue producer to ensure all pending messages are deliverd
+   * to the queue between pinot server and pinot key-coordinator
+   *
+   * this method will ensure that pinot server can send all events to key coordinator eventually before a segment
+   * is committed. If this does not happen we might lose data in case of machine failure.
+   */
+  void postConsumeLoop();

Review comment:
       consumeLoop() is an implementation. If we rename the method, and then this API name will have no meaning. Please indicate clearly _when_ you want this callback, and we can decide on a name. Do you want the callback to happen after successful CONSMING to ONLINE state transition (i.e. the completed segment has been loaded in memory, and we are ready to get rid of the CONSUMING segment memory for this segment). Or, do you really want it to be called even if we may catch up later on with some other server because we got a CATCHUP from the controller? (this can be VERY hard to verify correctness, and I would question why that is needed)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {

Review comment:
       A suggestion to make it more readable. All callback methods should be named starting with "on". So, `onSegmentConsumingToOnline` or `onSegmentOfflineToOnline`, or `onSegmentStartConsuming`, etc.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  /**
+   * process the row after transformation in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the GenericRow has been transformed by RecordTransformer and before it is indexed by
+   * IndexSegmentImpl, to ensure we can provide other necessary data to the segment metadata.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will attach the offset object into the GenericRow object.
+   *
+   * The reason we need this particular logic is that in upsert table, we need to add offset data to the physical data
+   * this will help us to apply the update events from key coordinator to upsert table correctly as the offset
+   * is used as the index to identify which record's virtual column we want to update
+   *
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular pinot record
+   */
+  void processTransformedRow(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * process the row after indexing in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the MutableSegmentImpl has done the indexing of the current row in its physical storage
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will emit an event to the message queue that will deliver the event to
+   * key coordinator.
+   *
+   * This method ensures that we can emit the metadata for an new entry that pinot just indexed to its internal storage
+   * and let key coordinator to be able to consume those events to process the updates correctly
+   *
+   * @param row the row we just index in the current segment
+   * @param offset the offset associated with the current row
+   */
+  void postIndexProcessing(GenericRow row, StreamPartitionMsgOffset offset);

Review comment:
       rename as `onRealtimeRowIndexed()`. Indicate whether this method should be called in LLC or HLC or both. Indicate whether the method should be called if the row is aggregated (i.e. in the case where `aggregateMetrics` is turned on and the row is not indexed, but aggregated. See https://docs.pinot.apache.org/basics/components/table#tableindexconfig)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kishoreg commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
kishoreg commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-652789093


   @jamesyfshao is there anything pending here. Let's get this in.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-637852570


   > @jamesyfshao I looked over your design doc, thanks for adding the section on interfaces you need. I also looked at the repo you mentioned.
   > It is not clear why we need all these interfaces. If you can add some clarifications in the design doc, that will help me better. We can then discuss online what the interface should be.
   > @kishoreg if you have additional comments
   
   @mcvsubbu I totally got why you feel that way as it is hard to lay out the all the exact details in without the code context. It could be confusing when we leave out the upsert-enabled callback implementation which shows why we need them. But please understand that we leave out some of exact interface details in the design doc as we are trying to make the design doc to be flexible - code could changes/evolve over time but the design doc should capture the high-level design ideals and data flows that will be consistent over the evolution of this feature, especially considered these interface is still being involved over the next few PRs. 
   
   Do you think it might be helpful if I put more comments and example in the codes (mainly the interface method) to show why those new callbacks are added and their purpose instead in the design doc? This way we can update the documentation as we evolve our interface while keeping our design doc less noisy?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446713896



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;

Review comment:
       Updated documentation for the class to indicate this is called for every dataManager constructor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446715996



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  /**
+   * process the row after transformation in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the GenericRow has been transformed by RecordTransformer and before it is indexed by
+   * IndexSegmentImpl, to ensure we can provide other necessary data to the segment metadata.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will attach the offset object into the GenericRow object.
+   *
+   * The reason we need this particular logic is that in upsert table, we need to add offset data to the physical data
+   * this will help us to apply the update events from key coordinator to upsert table correctly as the offset
+   * is used as the index to identify which record's virtual column we want to update
+   *
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular pinot record
+   */
+  void processTransformedRow(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * process the row after indexing in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the MutableSegmentImpl has done the indexing of the current row in its physical storage
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will emit an event to the message queue that will deliver the event to
+   * key coordinator.
+   *
+   * This method ensures that we can emit the metadata for an new entry that pinot just indexed to its internal storage
+   * and let key coordinator to be able to consume those events to process the updates correctly
+   *
+   * @param row the row we just index in the current segment
+   * @param offset the offset associated with the current row
+   */
+  void postIndexProcessing(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * perform any necessary finalize operation after the consumer loop finished in LLRealtimeSegmentDataManager.consumeLoop(...)
+   * method. It happens after the consumerloop exits the loop and reached the end criteria.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will flush the queue producer to ensure all pending messages are deliverd
+   * to the queue between pinot server and pinot key-coordinator
+   *
+   * this method will ensure that pinot server can send all events to key coordinator eventually before a segment
+   * is committed. If this does not happen we might lose data in case of machine failure.
+   */
+  void postConsumeLoop();
+
+  /**
+   * perform any necessary clean up operation when the SegmentDataManager called its destroy() method.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will notify segmentUpdater to remove any registered reference for this
+   * dataManagerCallback.
+   *
+   * this method will ensure that segmentUpdater can keep track of which dataManager is still alive in the current pinot
+   * server so it can dispatch appropriate update events to only the alive pinot data manager
+   */
+  void destroy();

Review comment:
       This method intends to be called when the destroy() method is called in the dataManager. One thing I want to note that the previous method and this method could be called at different times (consumption finished -> destroy could take seconds to minutes to fo finished, depends on controller logics), so we cannot re-use the previous method as they have different meaning. Also a DataManager that handles immutable segment will not call the previous method, but will call this one if an segment expired due to retention policy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-637813614


   @jamesyfshao I looked over your design doc, thanks for adding the section on interfaces you need. I also looked at the repo you mentioned.
   It is not clear why we need all these interfaces. If you can add some clarifications in the design doc, that will help me better. We can then discuss online what the interface should be.
   @kishoreg if you have additional comments


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-634800807


   @kishoreg @mcvsubbu @Jackie-Jiang appreciated if you guys can take a look at the diff this week when you have time


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446715531



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  /**
+   * process the row after transformation in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the GenericRow has been transformed by RecordTransformer and before it is indexed by
+   * IndexSegmentImpl, to ensure we can provide other necessary data to the segment metadata.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will attach the offset object into the GenericRow object.
+   *
+   * The reason we need this particular logic is that in upsert table, we need to add offset data to the physical data
+   * this will help us to apply the update events from key coordinator to upsert table correctly as the offset
+   * is used as the index to identify which record's virtual column we want to update
+   *
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular pinot record
+   */
+  void processTransformedRow(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * process the row after indexing in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the MutableSegmentImpl has done the indexing of the current row in its physical storage
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will emit an event to the message queue that will deliver the event to
+   * key coordinator.
+   *
+   * This method ensures that we can emit the metadata for an new entry that pinot just indexed to its internal storage
+   * and let key coordinator to be able to consume those events to process the updates correctly
+   *
+   * @param row the row we just index in the current segment
+   * @param offset the offset associated with the current row
+   */
+  void postIndexProcessing(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * perform any necessary finalize operation after the consumer loop finished in LLRealtimeSegmentDataManager.consumeLoop(...)
+   * method. It happens after the consumerloop exits the loop and reached the end criteria.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will flush the queue producer to ensure all pending messages are deliverd
+   * to the queue between pinot server and pinot key-coordinator
+   *
+   * this method will ensure that pinot server can send all events to key coordinator eventually before a segment
+   * is committed. If this does not happen we might lose data in case of machine failure.
+   */
+  void postConsumeLoop();

Review comment:
       Update the method name (onConsumptionStoppedOrEndReached) to indicate that this method is supposed to happen during the consumption stop/end-condition reached. Right now it is fine to call this multiple times if we got catch up call. The purpose of the method is indicated at the method doc: (ensure that pinot server can send all events to key coordinator eventually before a segment is committed). Do you think the new naming would be clear enough to a reader or you think we should add more to the method documentation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446714977



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();

Review comment:
       So for dataManager in pinot, there are mostly 3 layer:
   
   1. TableDataManager: this is the class that handles when a segment should be add/remove from a pinot server and the handling of all segment lock. We add a callback for this class _TableDataManagerCallback_ to ensure that we can add listener for segment addition & removal operation in Pinot
   
   2. DataManager: this is the class that handles the ingestion from input stream. As some of the information related to the input stream is only visible at this layer (aka offset information), we are adding _DataManagerCallback_ that interact with the offset information and sending information to key-coordinator queue here to make it consistent.
   
   3. IndexSegment: this is the class that handles the interaction between incoming data and underlying storage (either in-memory or on-disk). There are some information that is only visible at this layer (actual data storage, docId information, etc). We are adding _IndexSegmentCallback_ that interact with the physical data storage and access the docId information here.
   
   We *could* make all 3 of these callback into one callback class and make the all 3 managers I listed above to interact with this one single callback, but I feel make the callbacks function unclear and hard to get to know intention of each method. By separating the corresponding callback class out, we make each them more focus on their own interaction and self-contain. However, it would also means that we need a way to create the appropriate callback objects from the higher level callback object (the same way we create a DataManager from TableDataManager, and IndexSegment from DataManager).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446713828



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {

Review comment:
       > the interface definition should go into pinot-spi
   
   I looked into the option of moving them to the pinot-spi package. However I don't think we are able to do that at the moment due to the IndexSegmentCallback class need to read the internal data of offset column to build the offset -> docId mapping. WIthout this information we are not able to update the virtual columns for records by lookin up their offset. And because of the offset columns data are stored with the forward-index related class and they are visible in pinot-core package only, moving them out to pinot-spi package will make it hard for us to achieve the same functionality.
   
   
   
   > A suggestion to make it more readable. All callback methods should be named starting with "on". So, `onSegmentConsumingToOnline` or `onSegmentOfflineToOnline`, or `onSegmentStartConsuming`, etc.
   
   done 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-661665986


   > @jamesyfshao can we take the discussion to the design doc? I had asked a few questions there, but they are not resolved yet. Please see my comments on the "New Interface" section.
   > 
   > https://docs.google.com/document/d/1SFFir7ByxCff-aVYxQeTHpNhPXeP5q7P4g_6O2iNGgU/edit
   
   Hi @mcvsubbu sorry for the late replies, a bit busy last week. I have addressed your comment in the design docs again. Please feel free to see if those comments need further clarification. 
   
   Sorry I didn't address them sooner mostly because I am trying to address the clarification issues in the code documentation instead of design docs. I have been spending a bit more efforts on making sure the method interface naming are clear and comments in the code are accurate because I feel it is always easier for future Pinot maintainer to understand the actual logic of codes by reading how it interacts with other components and comments in the codes vs finding a particular section that links in some external design docs. I also feel this feature is very new and expected to continue evolving within the foreseeable future. This means that designs of actual logics will continue to changes and comments in codes is most likely to be more up-to-date compared to a static design doc. These are my reasoning for trying to put more emphasis on the comments instead of listing all API interfaces in a design doc. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446716242



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/IndexSegmentCallback.java
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.Map;
+
+/**
+ * Component inject to {@link IndexSegment} for handling extra logic for
+ * upsert-enabled pinot ingestion mode
+ */
+@InterfaceStability.Evolving
+public interface IndexSegmentCallback {
+
+  /**
+   * Initialize the callback from {@link IndexSegment}. This happens in constructor of IndexSegment implementation class
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will initialize the necessary virtual columns for upsert table
+   * (offset mapping, $validFrom, $validUntil columns)
+   *
+   * this method ensure that virtual columns index (forward/reverted index) can be created properly and load
+   * any existing data for the virtual column from local storage
+   *
+   * @param segmentMetadata the metadata associated with the current segment
+   * @param columnIndexContainerMap mapping of necessary column name and the associate columnIndexContainer
+   */
+  void init(SegmentMetadata segmentMetadata, Map<String, ColumnIndexContainer> columnIndexContainerMap);
+
+  /**
+   * Perform operation from the callback for the given row after it has been processed and index.
+   * This method happens after indexing finished in MutableSegmentImpl.index(GenericRow, RowMetadata) method
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will build offset mapping and update virtual columns for the column if necessary
+   *
+   * This method is similar to
+   * {@link org.apache.pinot.core.data.manager.callback.DataManagerCallback#postIndexProcessing(GenericRow, StreamPartitionMsgOffset)}
+   * However, the other method don't have information to docID and it is necessary for upsert virtual columns
+   * to have these information to build the forward index and offset columns to build mapping between offset -> docId
+   * we also might need to update virtual column forward-index for the newly ingested row
+   *
+   * @param row the current pinot row we just indexed into the current IndexSegment
+   * @param docId the docId of this record
+   */
+  void postProcessRecords(GenericRow row, int docId);
+
+  /**
+   * Retrieve information related to an upsert-enable segment virtual column for debug purpose
+   *
+   * @param offset the offset of the record we are trying to get the virtual columnn data for
+   * @return string representation of the virtual column data information
+   */
+  String getVirtualColumnInfo(StreamPartitionMsgOffset offset);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on a change in pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#discussion_r446713946



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/callback/DataManagerCallback.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.callback;
+
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.io.IOException;
+
+/**
+ * Component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logic for
+ * upsert-enabled pinot ingestion mode.
+ */
+@InterfaceStability.Evolving
+public interface DataManagerCallback {
+
+  void init() throws IOException;
+
+  /**
+   * create a {@link IndexSegmentCallback} object to allow SegmentDataManager to create proper IndexSegment that supports
+   * either append/upsert mode
+   *
+   * In append-tables callback, this method will create a DefaultIndexSegmentCallback
+   * In upsert-tables callback, this method will create a UpsertDataManagerCallbackImpl
+   *
+   * The {@link IndexSegmentCallback} will be used in the constructor of IndexSegment
+   */
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  /**
+   * process the row after transformation in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the GenericRow has been transformed by RecordTransformer and before it is indexed by
+   * IndexSegmentImpl, to ensure we can provide other necessary data to the segment metadata.
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will attach the offset object into the GenericRow object.
+   *
+   * The reason we need this particular logic is that in upsert table, we need to add offset data to the physical data
+   * this will help us to apply the update events from key coordinator to upsert table correctly as the offset
+   * is used as the index to identify which record's virtual column we want to update
+   *
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular pinot record
+   */
+  void processTransformedRow(GenericRow row, StreamPartitionMsgOffset offset);
+
+  /**
+   * process the row after indexing in LLRealtimeSegmentDataManager.processStreamEvents(...) method
+   * it happens after the MutableSegmentImpl has done the indexing of the current row in its physical storage
+   *
+   * In append-tables callback, this method will do nothing
+   * In upsert-tables callback, this method will emit an event to the message queue that will deliver the event to
+   * key coordinator.
+   *
+   * This method ensures that we can emit the metadata for an new entry that pinot just indexed to its internal storage
+   * and let key coordinator to be able to consume those events to process the updates correctly
+   *
+   * @param row the row we just index in the current segment
+   * @param offset the offset associated with the current row
+   */
+  void postIndexProcessing(GenericRow row, StreamPartitionMsgOffset offset);

Review comment:
       Updated the documentation indicating this is for LLC and index only (not aggregation)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jamesyfshao commented on pull request #5394: add callback interface for upsert component

Posted by GitBox <gi...@apache.org>.
jamesyfshao commented on pull request #5394:
URL: https://github.com/apache/incubator-pinot/pull/5394#issuecomment-657372831


   @mcvsubbu pinging you again to see if you have time this week to see if the changes looks good or not


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org