You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/12 20:13:07 UTC

[incubator-pinot] 08/12: Add retries to RocksDB reads and writes

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit fea13a01fd24b2ab143a397c9a2a8d916759e935
Author: Bo Zhang <bz...@uber.com>
AuthorDate: Mon Nov 4 11:36:24 2019 -0800

    Add retries to RocksDB reads and writes
    
    Reviewers: #streaming_pinot, sjames
    
    Reviewed By: #streaming_pinot, sjames
    
    Subscribers: sjames
    
    Maniphest Tasks: T4289467
    
    Differential Revision: https://code.uberinternal.com/D3545295
---
 .../common/keyValueStore/KeyValueStoreTable.java   |  2 -
 pinot-grigio/pinot-grigio-coordinator/pom.xml      |  3 --
 .../common/keyValueStore/RocksDBBatchReader.java   | 62 ++++++++++++++++++++++
 .../common/keyValueStore/RocksDBBatchWriter.java   | 56 +++++++++++++++++++
 .../keyValueStore/RocksDBKeyValueStoreTable.java   | 31 +++--------
 5 files changed, 126 insertions(+), 28 deletions(-)

diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java
index 5145fd8..d39eae7 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java
@@ -26,8 +26,6 @@ public interface KeyValueStoreTable<K, V> {
 
   Map<K, V> multiGet(List<K> keys) throws IOException;
 
-  void multiPut(List<K> keys, List<V> values) throws IOException;
-
   void multiPut(Map<K, V> keyValuePairs) throws IOException;
 
   void deleteTable() throws IOException;
diff --git a/pinot-grigio/pinot-grigio-coordinator/pom.xml b/pinot-grigio/pinot-grigio-coordinator/pom.xml
index 19499d1..f5b9527 100644
--- a/pinot-grigio/pinot-grigio-coordinator/pom.xml
+++ b/pinot-grigio/pinot-grigio-coordinator/pom.xml
@@ -63,7 +63,6 @@
     <dependency>
       <groupId>commons-configuration</groupId>
       <artifactId>commons-configuration</artifactId>
-      <version>1.6</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
@@ -102,6 +101,4 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
-
-
 </project>
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchReader.java b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchReader.java
new file mode 100644
index 0000000..3512414
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchReader.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.grigio.common.keyValueStore;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDBBatchReader implements Callable<Boolean> {
+  static final int MAX_RETRY_ATTEMPTS = 3;
+  static final long RETRY_WAIT_MS = 1000L;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBBatchReader.class);
+
+  private final RocksDB _rocksDB;
+  private final List<byte[]> _byteKeys;
+  private Map<byte[], byte[]> _result;
+
+  RocksDBBatchReader(RocksDB rocksDB, List<byte[]> byteKeys) {
+    _rocksDB = rocksDB;
+    _byteKeys = byteKeys;
+  }
+
+  Map<byte[], byte[]> getResult() {
+    if (_result == null) {
+      throw new RuntimeException("No data got from RocksDB yet!");
+    }
+    return _result;
+  }
+
+  @Override
+  public Boolean call() {
+    try {
+      _result = _rocksDB.multiGet(_byteKeys);
+      return true;
+    } catch (RocksDBException e) {
+      LOGGER.warn("Failed to read from RocksDB: ", e);
+      return false;
+    }
+  }
+}
diff --git a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchWriter.java b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchWriter.java
new file mode 100644
index 0000000..a910c02
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchWriter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.grigio.common.keyValueStore;
+
+import java.util.concurrent.Callable;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDBBatchWriter implements Callable<Boolean> {
+  static final int MAX_RETRY_ATTEMPTS = 3;
+  static final long RETRY_WAIT_MS = 1000L;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBBatchWriter.class);
+
+  private final RocksDB _rocksDB;
+  private final WriteOptions _writeOptions;
+  private final WriteBatch _batch;
+
+  public RocksDBBatchWriter(RocksDB rocksDB, WriteOptions writeOptions, WriteBatch batch) {
+    _rocksDB = rocksDB;
+    _writeOptions = writeOptions;
+    _batch = batch;
+  }
+
+  @Override
+  public Boolean call() {
+    try {
+      _rocksDB.write(_writeOptions, _batch);
+      return true;
+    } catch (RocksDBException e) {
+      LOGGER.warn("Failed to write to RocksDB: ", e);
+      return false;
+    }
+  }
+}
diff --git a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java
index 814f392..a712360 100644
--- a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java
+++ b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.grigio.common.keyValueStore;
 
-import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.grigio.common.messages.KeyCoordinatorMessageContext;
 import org.rocksdb.Options;
 import org.rocksdb.ReadOptions;
@@ -65,7 +65,9 @@ public class RocksDBKeyValueStoreTable implements KeyValueStoreTable<ByteArrayWr
   public Map<ByteArrayWrapper, KeyCoordinatorMessageContext> multiGet(List<ByteArrayWrapper> keys) throws IOException {
     try {
       List<byte[]> byteKeys = keys.stream().map(ByteArrayWrapper::getData).collect(Collectors.toList());
-      Map<byte[], byte[]> rocksdbResult = _db.multiGet(byteKeys);
+      RocksDBBatchReader batchReader = new RocksDBBatchReader(_db, byteKeys);
+      RetryPolicies.fixedDelayRetryPolicy(RocksDBBatchReader.MAX_RETRY_ATTEMPTS, RocksDBBatchReader.RETRY_WAIT_MS).attempt(batchReader);
+      Map<byte[], byte[]> rocksdbResult = batchReader.getResult();
       Map<ByteArrayWrapper, KeyCoordinatorMessageContext> result = new HashMap<>(rocksdbResult.size());
       for (Map.Entry<byte[], byte[]> entry : rocksdbResult.entrySet()) {
         Optional<KeyCoordinatorMessageContext> value = KeyCoordinatorMessageContext.fromBytes(entry.getValue());
@@ -76,30 +78,12 @@ public class RocksDBKeyValueStoreTable implements KeyValueStoreTable<ByteArrayWr
         }
       }
       return result;
-    } catch (RocksDBException e) {
+    } catch (Exception e) {
       throw new IOException("failed to get keys from rocksdb " + _path, e);
     }
   }
 
   @Override
-  public void multiPut(List<ByteArrayWrapper> keys, List<KeyCoordinatorMessageContext> values) throws IOException {
-    Preconditions.checkState(keys.size() == values.size(),
-        "keys size {} does not match values size {}", keys.size(), values.size());
-    if (keys.size() == 0) {
-      return;
-    }
-    final WriteBatch batch = new WriteBatch();
-    try {
-      for (int i = 0; i < keys.size(); i++) {
-        batch.put(keys.get(i).getData(), values.get(i).toBytes());
-      }
-      _db.write(_writeOptions, batch);
-    } catch (RocksDBException e) {
-      throw new IOException("failed to put data to rocksdb table " + _path, e);
-    }
-  }
-
-  @Override
   public void multiPut(Map<ByteArrayWrapper, KeyCoordinatorMessageContext> keyValuePairs) throws IOException {
     if (keyValuePairs.size() == 0) {
       return;
@@ -109,8 +93,9 @@ public class RocksDBKeyValueStoreTable implements KeyValueStoreTable<ByteArrayWr
       for (Map.Entry<ByteArrayWrapper, KeyCoordinatorMessageContext> entry: keyValuePairs.entrySet()) {
         batch.put(entry.getKey().getData(), entry.getValue().toBytes());
       }
-      _db.write(_writeOptions, batch);
-    } catch (RocksDBException e) {
+      RocksDBBatchWriter batchWriter = new RocksDBBatchWriter(_db, _writeOptions, batch);
+      RetryPolicies.fixedDelayRetryPolicy(RocksDBBatchWriter.MAX_RETRY_ATTEMPTS, RocksDBBatchWriter.RETRY_WAIT_MS).attempt(batchWriter);
+    } catch (Exception e) {
       throw new IOException("failed to put data to rocksdb table " + _path, e);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org