You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "raghavgautam (via GitHub)" <gi...@apache.org> on 2023/04/18 22:33:14 UTC

[GitHub] [pinot] raghavgautam opened a new pull request, #10639: changing the dedup store to become pluggable

raghavgautam opened a new pull request, #10639:
URL: https://github.com/apache/pinot/pull/10639

   This patch is the first patch for memory problem discussed in https://github.com/apache/pinot/issues/10571
   
   It makes the LocalKeyValue store to be pluggable. Currently, it is hard-coded to be ConcurrentHashMap. This will make it easy to plugin different implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1174185026


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java:
##########
@@ -114,12 +146,13 @@ public PrimaryKey next() {
   }
 
   public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) {
-    boolean present =
-        _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) != null;
-    if (!present) {
+    byte[] keyBytes = serializePrimaryKey(HashUtils.hashPrimaryKey(pk, _dedupConfig.getHashFunction()));
+    if (Objects.isNull(_keyValueStore.get(keyBytes))) {

Review Comment:
   Will fix this by bringing back the putIfAbsent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1171885153


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java:
##########
@@ -41,4 +48,8 @@ public HashFunction getHashFunction() {
   public boolean isDedupEnabled() {
     return _dedupEnabled;
   }
+
+  public String getKeyStore() {
+    return _keyStore;

Review Comment:
   right, can you add a comment on the default impl of this store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1171725569


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore(byte[] id) {

Review Comment:
   For concurrent hashmap this is unused, I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 merged pull request #10639: changing the dedup store to become pluggable

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 merged PR #10639:
URL: https://github.com/apache/pinot/pull/10639


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on pull request #10639: changing the dedup store to become pluggable

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 commented on PR #10639:
URL: https://github.com/apache/pinot/pull/10639#issuecomment-1516784366

   LGTM. @Jackie-Jiang can you also take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10639: changing the dedup store to become pluggable

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10639:
URL: https://github.com/apache/pinot/pull/10639#issuecomment-1513905597

   ## [Codecov](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#10639](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8e60fff) into [master](https://codecov.io/gh/apache/pinot/commit/f6c6d14540500867a167859a05e6822b60ce25c2?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f6c6d14) will **decrease** coverage by `45.95%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10639       +/-   ##
   =============================================
   - Coverage     70.35%   24.41%   -45.95%     
   + Complexity     6464       49     -6415     
   =============================================
     Files          2103     2090       -13     
     Lines        112769   112959      +190     
     Branches      16981    17038       +57     
   =============================================
   - Hits          79341    27581    -51760     
   - Misses        27877    82446    +54569     
   + Partials       5551     2932     -2619     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `24.41% <0.00%> (-0.05%)` | :arrow_down: |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ata/manager/realtime/RealtimeTableDataManager.java](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUmVhbHRpbWVUYWJsZURhdGFNYW5hZ2VyLmphdmE=) | `39.15% <ø> (-28.27%)` | :arrow_down: |
   | [...nt/local/dedup/ConcurrentHashMapKeyValueStore.java](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9kZWR1cC9Db25jdXJyZW50SGFzaE1hcEtleVZhbHVlU3RvcmUuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ent/local/dedup/PartitionDedupMetadataManager.java](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9kZWR1cC9QYXJ0aXRpb25EZWR1cE1ldGFkYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (-66.00%)` | :arrow_down: |
   | [...segment/local/dedup/TableDedupMetadataManager.java](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9kZWR1cC9UYWJsZURlZHVwTWV0YWRhdGFNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...org/apache/pinot/spi/config/table/DedupConfig.java](https://codecov.io/gh/apache/pinot/pull/10639?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL0RlZHVwQ29uZmlnLmphdmE=) | `0.00% <0.00%> (-83.34%)` | :arrow_down: |
   
   ... and [1588 files with indirect coverage changes](https://codecov.io/gh/apache/pinot/pull/10639/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1173197102


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java:
##########
@@ -114,12 +146,13 @@ public PrimaryKey next() {
   }
 
   public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) {
-    boolean present =
-        _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) != null;
-    if (!present) {
+    byte[] keyBytes = serializePrimaryKey(HashUtils.hashPrimaryKey(pk, _dedupConfig.getHashFunction()));
+    if (Objects.isNull(_keyValueStore.get(keyBytes))) {

Review Comment:
   This is a non-atomic operation (contrast with `putIfAbsent`).



##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {
+    byte[] get(byte[] key);

Review Comment:
   Can we use Generics here to allow the ConcurrentHashMap kv-store to skip serialization?
   
   When we stick with in-memory hash-map we wouldn't want to add the overhead of serialization during ingestion.



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStoreTest.java:
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ConcurrentHashMapKeyValueStoreTest {

Review Comment:
   Code format seems incorrect. (we use 2 spaces instead of 4 for indent)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1175605176


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+import org.apache.pinot.spi.utils.ByteArray;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore() {
+    _map = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return _map.get(new ByteArray(key));
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    _map.remove(new ByteArray(key));
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    _map.put(new ByteArray(key), value);
+  }
+  public byte[] putIfAbsent(byte[] key, byte[] value) {

Review Comment:
   Missing space between methods.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {

Review Comment:
   Let's also add doc comments. Particularly for `compact`.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {
+  byte[] get(byte[] key);
+
+  void delete(byte[] key);
+
+  void put(byte[] key, byte[] value);
+  byte[] putIfAbsent(byte[] key, byte[] value);

Review Comment:
   Missing space between methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1175796693


##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {
+  byte[] get(byte[] key);
+
+  void delete(byte[] key);
+
+  void put(byte[] key, byte[] value);
+  byte[] putIfAbsent(byte[] key, byte[] value);

Review Comment:
   fixed.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on PR #10639:
URL: https://github.com/apache/pinot/pull/10639#issuecomment-1532006112

   @Jackie-Jiang I have addressed your comment. Can you take another look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1172057256


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore(byte[] id) {
+    _map = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return _map.get(new ByteArray(key));
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    _map.remove(new ByteArray(key));
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    _map.put(new ByteArray(key), value);
+  }
+
+  @Override
+  public void putBatch(List<Pair<byte[], byte[]>> keyValues) {
+    keyValues.forEach(pair -> _map.put(new ByteArray(pair.getKey()), pair.getValue()));
+  }
+
+  @Override
+  public long getKeyCount() {
+    return _map.size();
+  }
+
+  @Override
+  @VisibleForTesting
+  public void compact() {
+  }
+
+  private static final class ByteArray {

Review Comment:
   Given that it has an extra field for storing hash, it will consume 4 more bytes per key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1170760727


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore(byte[] id) {

Review Comment:
   we do not use the input?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java:
##########
@@ -39,46 +44,73 @@ public class PartitionDedupMetadataManager {
   private final List<String> _primaryKeyColumns;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
-  private final HashFunction _hashFunction;
-
-  @VisibleForTesting
-  final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
+  private final DedupConfig _dedupConfig;
+  @VisibleForTesting final LocalKeyValueStore _keyValueStore;
 
   public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
-      ServerMetrics serverMetrics, HashFunction hashFunction) {
+      ServerMetrics serverMetrics, DedupConfig dedupConfig) {
     _tableNameWithType = tableNameWithType;
     _primaryKeyColumns = primaryKeyColumns;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
-    _hashFunction = hashFunction;
+    _dedupConfig = dedupConfig;
+    try {
+      byte[] id = (tableNameWithType + "@" + partitionId).getBytes();
+      _keyValueStore = StringUtils.isEmpty(dedupConfig.getKeyStore())
+              ? new ConcurrentHashMapKeyValueStore(id)
+              : PluginManager.get().createInstance(
+                      dedupConfig.getKeyStore(),
+                      new Class[]{byte[].class},
+                      new Object[]{id}
+              );
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public void addSegment(IndexSegment segment) {
-    // Add all PKs to _primaryKeyToSegmentMap
     Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
+    byte[] serializedSegment = serializeSegment(segment);
+    List<Pair<byte[], byte[]>> keyValues = new ArrayList<>();
     while (primaryKeyIterator.hasNext()) {
       PrimaryKey pk = primaryKeyIterator.next();
-      _primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), segment);
+      byte[] serializedPrimaryKey = serializePrimaryKey(HashUtils.hashPrimaryKey(pk, _dedupConfig.getHashFunction()));
+      keyValues.add(Pair.of(serializedPrimaryKey, serializedSegment));
     }
+    _keyValueStore.putBatch(keyValues);
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
-        _primaryKeyToSegmentMap.size());
+        _keyValueStore.getKeyCount());
+  }
+
+  @VisibleForTesting
+  static byte[] serializeSegment(IndexSegment segment) {
+    return segment.getSegmentName().getBytes();
+  }
+
+  @VisibleForTesting
+  static byte[] serializePrimaryKey(Object pk) {
+    if (pk instanceof PrimaryKey) {
+      return ((PrimaryKey) pk).asBytes();
+    }
+    if (pk instanceof ByteArray) {
+      return ((ByteArray) pk).getBytes();
+    }
+    throw new RuntimeException("Invalid primary key: " + pk);

Review Comment:
   the msg can better be "unsupported pk class"



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java:
##########
@@ -41,4 +48,8 @@ public HashFunction getHashFunction() {
   public boolean isDedupEnabled() {
     return _dedupEnabled;
   }
+
+  public String getKeyStore() {
+    return _keyStore;

Review Comment:
   this has a default value?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java:
##########
@@ -39,46 +44,73 @@ public class PartitionDedupMetadataManager {
   private final List<String> _primaryKeyColumns;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
-  private final HashFunction _hashFunction;
-
-  @VisibleForTesting
-  final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
+  private final DedupConfig _dedupConfig;
+  @VisibleForTesting final LocalKeyValueStore _keyValueStore;
 
   public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
-      ServerMetrics serverMetrics, HashFunction hashFunction) {
+      ServerMetrics serverMetrics, DedupConfig dedupConfig) {
     _tableNameWithType = tableNameWithType;
     _primaryKeyColumns = primaryKeyColumns;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
-    _hashFunction = hashFunction;
+    _dedupConfig = dedupConfig;
+    try {
+      byte[] id = (tableNameWithType + "@" + partitionId).getBytes();

Review Comment:
   why do we need table name in the key?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {
+    byte[] get(byte[] key);
+
+    void delete(byte[] key);
+
+    void put(byte[] key, byte[] value);
+
+    void putBatch(List<Pair<byte[], byte[]>> keyValues);
+
+    long getKeyCount();

Review Comment:
   does this need to be long?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1171727323


##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {
+    byte[] get(byte[] key);
+
+    void delete(byte[] key);
+
+    void put(byte[] key, byte[] value);
+
+    void putBatch(List<Pair<byte[], byte[]>> keyValues);
+
+    long getKeyCount();

Review Comment:
   At 1K/sec this will get 1B keys in 12 days and int only goes till 2B.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on pull request #10639: changing the dedup store to become pluggable

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 commented on PR #10639:
URL: https://github.com/apache/pinot/pull/10639#issuecomment-1540934693

   @Jackie-Jiang I made a pass on the most recent change, and it looks good to me. Feel free to review this post-merge when you are back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1174186784


##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/dedup/LocalKeyValueStore.java:
##########
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public interface LocalKeyValueStore {
+    byte[] get(byte[] key);

Review Comment:
   If we try generics, the code will have to know that serialization is not needed so, it will be coupled to the ConcurrentHashMapKeyValueStore implementation. This is undesirable. The serialization that we are using is simple enough to not add significant overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1171883891


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore(byte[] id) {
+    _map = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return _map.get(new ByteArray(key));
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    _map.remove(new ByteArray(key));
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    _map.put(new ByteArray(key), value);
+  }
+
+  @Override
+  public void putBatch(List<Pair<byte[], byte[]>> keyValues) {
+    keyValues.forEach(pair -> _map.put(new ByteArray(pair.getKey()), pair.getValue()));
+  }
+
+  @Override
+  public long getKeyCount() {
+    return _map.size();
+  }
+
+  @Override
+  @VisibleForTesting
+  public void compact() {
+  }
+
+  private static final class ByteArray {

Review Comment:
   use https://github.com/apache/pinot/blob/master/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on pull request #10639: changing the dedup store to become pluggable

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 commented on PR #10639:
URL: https://github.com/apache/pinot/pull/10639#issuecomment-1529828906

   > > @Jackie-Jiang The serialization that we are using is simple enough to not add significant overhead. It will probably cost 100 or so CPU cycles per record. Moreover, this overhead happens only during ingestion time and does not affect query performance. Keeping the LocalKeyValueStore pluggable will make it reusable in other places as well.
   > 
   > It is not that trivial comparing to other operations when updating a record. Adding a record to KV store could be much cheaper than serializing a primary key, meaning this change could cause several times performance degradation. Also, certain operation requires reference comparison, and should be handled differently if reference comparison is not possible
   
   @raghavgautam maybe we can have some quick benchmark to see how much overhead it adds, and it can be helpful to decide which layer we shall do the abstraction


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on PR #10639:
URL: https://github.com/apache/pinot/pull/10639#issuecomment-1526110888

   > Making it pluggable at KV store level will add extra overhead to the default implementation because it will force us to serialize everything. Instead, we can make it pluggable at metadata manager level so that there is no performance penalty to the default implementation. You may take a look at #9186 of how we made upsert metadata manager pluggable. This PR should be very similar to that
   
   @Jackie-Jiang The serialization that we are using is simple enough to not add significant overhead. It will probably cost 100 or so CPU cycles per record. Moreover, this overhead happens only during ingestion time and does not affect query performance. Keeping the LocalKeyValueStore pluggable will make it reusable in other places as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1179596748


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java:
##########
@@ -39,46 +44,73 @@ public class PartitionDedupMetadataManager {
   private final List<String> _primaryKeyColumns;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
-  private final HashFunction _hashFunction;
-
-  @VisibleForTesting
-  final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
+  private final DedupConfig _dedupConfig;
+  @VisibleForTesting final LocalKeyValueStore _keyValueStore;
 
   public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
-      ServerMetrics serverMetrics, HashFunction hashFunction) {
+      ServerMetrics serverMetrics, DedupConfig dedupConfig) {
     _tableNameWithType = tableNameWithType;
     _primaryKeyColumns = primaryKeyColumns;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
-    _hashFunction = hashFunction;
+    _dedupConfig = dedupConfig;
+    try {
+      byte[] id = (tableNameWithType + "@" + partitionId).getBytes();
+      _keyValueStore = StringUtils.isEmpty(dedupConfig.getKeyStore())
+              ? new ConcurrentHashMapKeyValueStore()
+              : PluginManager.get().createInstance(
+                      dedupConfig.getKeyStore(),
+                      new Class[]{byte[].class},
+                      new Object[]{id}
+              );
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public void addSegment(IndexSegment segment) {
-    // Add all PKs to _primaryKeyToSegmentMap
     Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
+    byte[] serializedSegment = serializeSegment(segment);
+    List<Pair<byte[], byte[]>> keyValues = new ArrayList<>();
     while (primaryKeyIterator.hasNext()) {
       PrimaryKey pk = primaryKeyIterator.next();
-      _primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), segment);
+      byte[] serializedPrimaryKey = serializePrimaryKey(HashUtils.hashPrimaryKey(pk, _dedupConfig.getHashFunction()));
+      keyValues.add(Pair.of(serializedPrimaryKey, serializedSegment));
     }
+    _keyValueStore.putBatch(keyValues);
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
-        _primaryKeyToSegmentMap.size());
+        _keyValueStore.getKeyCount());
+  }
+
+  @VisibleForTesting
+  static byte[] serializeSegment(IndexSegment segment) {
+    return segment.getSegmentName().getBytes();
+  }
+
+  @VisibleForTesting
+  static byte[] serializePrimaryKey(Object pk) {
+    if (pk instanceof PrimaryKey) {
+      return ((PrimaryKey) pk).asBytes();
+    }
+    if (pk instanceof ByteArray) {
+      return ((ByteArray) pk).getBytes();
+    }
+    throw new RuntimeException("Unsupported pk class: " + pk);
   }
 
   public void removeSegment(IndexSegment segment) {
     // TODO(saurabh): Explain reload scenario here
     Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
+    byte[] segmentBytes = serializeSegment(segment);
     while (primaryKeyIterator.hasNext()) {
       PrimaryKey pk = primaryKeyIterator.next();
-      _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction), (primaryKey, currentSegment) -> {
-        if (currentSegment == segment) {
-          return null;
-        } else {
-          return currentSegment;
-        }
-      });
+      byte[] pkBytes = serializePrimaryKey(pk);
+      if (Objects.deepEquals(_keyValueStore.get(pkBytes), segmentBytes)) {

Review Comment:
   This can potentially cause race condition when the segment is added again during the segment removal, and that's why we perform the reference check before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1171726210


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java:
##########
@@ -39,46 +44,73 @@ public class PartitionDedupMetadataManager {
   private final List<String> _primaryKeyColumns;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
-  private final HashFunction _hashFunction;
-
-  @VisibleForTesting
-  final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
+  private final DedupConfig _dedupConfig;
+  @VisibleForTesting final LocalKeyValueStore _keyValueStore;
 
   public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
-      ServerMetrics serverMetrics, HashFunction hashFunction) {
+      ServerMetrics serverMetrics, DedupConfig dedupConfig) {
     _tableNameWithType = tableNameWithType;
     _primaryKeyColumns = primaryKeyColumns;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
-    _hashFunction = hashFunction;
+    _dedupConfig = dedupConfig;
+    try {
+      byte[] id = (tableNameWithType + "@" + partitionId).getBytes();

Review Comment:
   We don't need it for concurrent hashmap. But for persistent stores, this is needed to identify which table-partition combination we are storing/querying for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1174185364


##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStoreTest.java:
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ConcurrentHashMapKeyValueStoreTest {

Review Comment:
   Will fix. Unsure why checkstyle + CI didn't catch this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1175797401


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+import org.apache.pinot.spi.utils.ByteArray;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore() {
+    _map = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return _map.get(new ByteArray(key));
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    _map.remove(new ByteArray(key));
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    _map.put(new ByteArray(key), value);
+  }
+  public byte[] putIfAbsent(byte[] key, byte[] value) {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1171726625


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java:
##########
@@ -39,46 +44,73 @@ public class PartitionDedupMetadataManager {
   private final List<String> _primaryKeyColumns;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
-  private final HashFunction _hashFunction;
-
-  @VisibleForTesting
-  final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
+  private final DedupConfig _dedupConfig;
+  @VisibleForTesting final LocalKeyValueStore _keyValueStore;
 
   public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
-      ServerMetrics serverMetrics, HashFunction hashFunction) {
+      ServerMetrics serverMetrics, DedupConfig dedupConfig) {
     _tableNameWithType = tableNameWithType;
     _primaryKeyColumns = primaryKeyColumns;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
-    _hashFunction = hashFunction;
+    _dedupConfig = dedupConfig;
+    try {
+      byte[] id = (tableNameWithType + "@" + partitionId).getBytes();
+      _keyValueStore = StringUtils.isEmpty(dedupConfig.getKeyStore())
+              ? new ConcurrentHashMapKeyValueStore(id)
+              : PluginManager.get().createInstance(
+                      dedupConfig.getKeyStore(),
+                      new Class[]{byte[].class},
+                      new Object[]{id}
+              );
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public void addSegment(IndexSegment segment) {
-    // Add all PKs to _primaryKeyToSegmentMap
     Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
+    byte[] serializedSegment = serializeSegment(segment);
+    List<Pair<byte[], byte[]>> keyValues = new ArrayList<>();
     while (primaryKeyIterator.hasNext()) {
       PrimaryKey pk = primaryKeyIterator.next();
-      _primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), segment);
+      byte[] serializedPrimaryKey = serializePrimaryKey(HashUtils.hashPrimaryKey(pk, _dedupConfig.getHashFunction()));
+      keyValues.add(Pair.of(serializedPrimaryKey, serializedSegment));
     }
+    _keyValueStore.putBatch(keyValues);
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
-        _primaryKeyToSegmentMap.size());
+        _keyValueStore.getKeyCount());
+  }
+
+  @VisibleForTesting
+  static byte[] serializeSegment(IndexSegment segment) {
+    return segment.getSegmentName().getBytes();
+  }
+
+  @VisibleForTesting
+  static byte[] serializePrimaryKey(Object pk) {
+    if (pk instanceof PrimaryKey) {
+      return ((PrimaryKey) pk).asBytes();
+    }
+    if (pk instanceof ByteArray) {
+      return ((ByteArray) pk).getBytes();
+    }
+    throw new RuntimeException("Invalid primary key: " + pk);

Review Comment:
   Will make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1171727096


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java:
##########
@@ -41,4 +48,8 @@ public HashFunction getHashFunction() {
   public boolean isDedupEnabled() {
     return _dedupEnabled;
   }
+
+  public String getKeyStore() {
+    return _keyStore;

Review Comment:
   for @JsonProperty, if a default value has not been unspecified it is taken to be empty string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1170633512


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore(byte[] id) {
+    _map = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return _map.get(new ByteArray(key));
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    _map.remove(new ByteArray(key));
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    _map.put(new ByteArray(key), value);
+  }
+
+  @Override
+  public void putBatch(List<Pair<byte[], byte[]>> keyValues) {
+    keyValues.forEach(pair -> _map.put(new ByteArray(pair.getKey()), pair.getValue()));
+  }
+
+  @Override
+  public long getKeyCount() {
+    return _map.size();
+  }
+
+  @Override
+  @VisibleForTesting
+  public void compact() {
+  }
+
+  private static final class ByteArray {

Review Comment:
   We need this because java.lang.Array picks gets it's hashCode() & equals() method from java.lang.Object which does not work for HashMap keys.
   https://docs.oracle.com/javase/8/docs/api/java/lang/reflect/Array.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] raghavgautam commented on a diff in pull request #10639: changing the dedup store to become pluggable

Posted by "raghavgautam (via GitHub)" <gi...@apache.org>.
raghavgautam commented on code in PR #10639:
URL: https://github.com/apache/pinot/pull/10639#discussion_r1172061043


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentHashMapKeyValueStore.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.ingestion.dedup.LocalKeyValueStore;
+
+public class ConcurrentHashMapKeyValueStore implements LocalKeyValueStore {
+
+  private final Map<ByteArray, byte[]> _map;
+
+  public ConcurrentHashMapKeyValueStore(byte[] id) {
+    _map = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return _map.get(new ByteArray(key));
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    _map.remove(new ByteArray(key));
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    _map.put(new ByteArray(key), value);
+  }
+
+  @Override
+  public void putBatch(List<Pair<byte[], byte[]>> keyValues) {
+    keyValues.forEach(pair -> _map.put(new ByteArray(pair.getKey()), pair.getValue()));
+  }
+
+  @Override
+  public long getKeyCount() {
+    return _map.size();
+  }
+
+  @Override
+  @VisibleForTesting
+  public void compact() {
+  }
+
+  private static final class ByteArray {

Review Comment:
   Will make the change.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java:
##########
@@ -41,4 +48,8 @@ public HashFunction getHashFunction() {
   public boolean isDedupEnabled() {
     return _dedupEnabled;
   }
+
+  public String getKeyStore() {
+    return _keyStore;

Review Comment:
   Will make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org