You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/12 20:13:07 UTC
[incubator-pinot] 08/12: Add retries to RocksDB reads and writes
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit fea13a01fd24b2ab143a397c9a2a8d916759e935
Author: Bo Zhang <bz...@uber.com>
AuthorDate: Mon Nov 4 11:36:24 2019 -0800
Add retries to RocksDB reads and writes
Reviewers: #streaming_pinot, sjames
Reviewed By: #streaming_pinot, sjames
Subscribers: sjames
Maniphest Tasks: T4289467
Differential Revision: https://code.uberinternal.com/D3545295
---
.../common/keyValueStore/KeyValueStoreTable.java | 2 -
pinot-grigio/pinot-grigio-coordinator/pom.xml | 3 --
.../common/keyValueStore/RocksDBBatchReader.java | 62 ++++++++++++++++++++++
.../common/keyValueStore/RocksDBBatchWriter.java | 56 +++++++++++++++++++
.../keyValueStore/RocksDBKeyValueStoreTable.java | 31 +++--------
5 files changed, 126 insertions(+), 28 deletions(-)
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java
index 5145fd8..d39eae7 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/keyValueStore/KeyValueStoreTable.java
@@ -26,8 +26,6 @@ public interface KeyValueStoreTable<K, V> {
Map<K, V> multiGet(List<K> keys) throws IOException;
- void multiPut(List<K> keys, List<V> values) throws IOException;
-
void multiPut(Map<K, V> keyValuePairs) throws IOException;
void deleteTable() throws IOException;
diff --git a/pinot-grigio/pinot-grigio-coordinator/pom.xml b/pinot-grigio/pinot-grigio-coordinator/pom.xml
index 19499d1..f5b9527 100644
--- a/pinot-grigio/pinot-grigio-coordinator/pom.xml
+++ b/pinot-grigio/pinot-grigio-coordinator/pom.xml
@@ -63,7 +63,6 @@
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
- <version>1.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -102,6 +101,4 @@
<scope>test</scope>
</dependency>
</dependencies>
-
-
</project>
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchReader.java b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchReader.java
new file mode 100644
index 0000000..3512414
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchReader.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.grigio.common.keyValueStore;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDBBatchReader implements Callable<Boolean> {
+ static final int MAX_RETRY_ATTEMPTS = 3;
+ static final long RETRY_WAIT_MS = 1000L;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBBatchReader.class);
+
+ private final RocksDB _rocksDB;
+ private final List<byte[]> _byteKeys;
+ private Map<byte[], byte[]> _result;
+
+ RocksDBBatchReader(RocksDB rocksDB, List<byte[]> byteKeys) {
+ _rocksDB = rocksDB;
+ _byteKeys = byteKeys;
+ }
+
+ Map<byte[], byte[]> getResult() {
+ if (_result == null) {
+ throw new RuntimeException("No data got from RocksDB yet!");
+ }
+ return _result;
+ }
+
+ @Override
+ public Boolean call() {
+ try {
+ _result = _rocksDB.multiGet(_byteKeys);
+ return true;
+ } catch (RocksDBException e) {
+ LOGGER.warn("Failed to read from RocksDB: ", e);
+ return false;
+ }
+ }
+}
diff --git a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchWriter.java b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchWriter.java
new file mode 100644
index 0000000..a910c02
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBBatchWriter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.grigio.common.keyValueStore;
+
+import java.util.concurrent.Callable;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDBBatchWriter implements Callable<Boolean> {
+ static final int MAX_RETRY_ATTEMPTS = 3;
+ static final long RETRY_WAIT_MS = 1000L;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBBatchWriter.class);
+
+ private final RocksDB _rocksDB;
+ private final WriteOptions _writeOptions;
+ private final WriteBatch _batch;
+
+ public RocksDBBatchWriter(RocksDB rocksDB, WriteOptions writeOptions, WriteBatch batch) {
+ _rocksDB = rocksDB;
+ _writeOptions = writeOptions;
+ _batch = batch;
+ }
+
+ @Override
+ public Boolean call() {
+ try {
+ _rocksDB.write(_writeOptions, _batch);
+ return true;
+ } catch (RocksDBException e) {
+ LOGGER.warn("Failed to write to RocksDB: ", e);
+ return false;
+ }
+ }
+}
diff --git a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java
index 814f392..a712360 100644
--- a/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java
+++ b/pinot-grigio/pinot-grigio-coordinator/src/main/java/org/apache/pinot/grigio/common/keyValueStore/RocksDBKeyValueStoreTable.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.grigio.common.keyValueStore;
-import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.grigio.common.messages.KeyCoordinatorMessageContext;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
@@ -65,7 +65,9 @@ public class RocksDBKeyValueStoreTable implements KeyValueStoreTable<ByteArrayWr
public Map<ByteArrayWrapper, KeyCoordinatorMessageContext> multiGet(List<ByteArrayWrapper> keys) throws IOException {
try {
List<byte[]> byteKeys = keys.stream().map(ByteArrayWrapper::getData).collect(Collectors.toList());
- Map<byte[], byte[]> rocksdbResult = _db.multiGet(byteKeys);
+ RocksDBBatchReader batchReader = new RocksDBBatchReader(_db, byteKeys);
+ RetryPolicies.fixedDelayRetryPolicy(RocksDBBatchReader.MAX_RETRY_ATTEMPTS, RocksDBBatchReader.RETRY_WAIT_MS).attempt(batchReader);
+ Map<byte[], byte[]> rocksdbResult = batchReader.getResult();
Map<ByteArrayWrapper, KeyCoordinatorMessageContext> result = new HashMap<>(rocksdbResult.size());
for (Map.Entry<byte[], byte[]> entry : rocksdbResult.entrySet()) {
Optional<KeyCoordinatorMessageContext> value = KeyCoordinatorMessageContext.fromBytes(entry.getValue());
@@ -76,30 +78,12 @@ public class RocksDBKeyValueStoreTable implements KeyValueStoreTable<ByteArrayWr
}
}
return result;
- } catch (RocksDBException e) {
+ } catch (Exception e) {
throw new IOException("failed to get keys from rocksdb " + _path, e);
}
}
@Override
- public void multiPut(List<ByteArrayWrapper> keys, List<KeyCoordinatorMessageContext> values) throws IOException {
- Preconditions.checkState(keys.size() == values.size(),
- "keys size {} does not match values size {}", keys.size(), values.size());
- if (keys.size() == 0) {
- return;
- }
- final WriteBatch batch = new WriteBatch();
- try {
- for (int i = 0; i < keys.size(); i++) {
- batch.put(keys.get(i).getData(), values.get(i).toBytes());
- }
- _db.write(_writeOptions, batch);
- } catch (RocksDBException e) {
- throw new IOException("failed to put data to rocksdb table " + _path, e);
- }
- }
-
- @Override
public void multiPut(Map<ByteArrayWrapper, KeyCoordinatorMessageContext> keyValuePairs) throws IOException {
if (keyValuePairs.size() == 0) {
return;
@@ -109,8 +93,9 @@ public class RocksDBKeyValueStoreTable implements KeyValueStoreTable<ByteArrayWr
for (Map.Entry<ByteArrayWrapper, KeyCoordinatorMessageContext> entry: keyValuePairs.entrySet()) {
batch.put(entry.getKey().getData(), entry.getValue().toBytes());
}
- _db.write(_writeOptions, batch);
- } catch (RocksDBException e) {
+ RocksDBBatchWriter batchWriter = new RocksDBBatchWriter(_db, _writeOptions, batch);
+ RetryPolicies.fixedDelayRetryPolicy(RocksDBBatchWriter.MAX_RETRY_ATTEMPTS, RocksDBBatchWriter.RETRY_WAIT_MS).attempt(batchWriter);
+ } catch (Exception e) {
throw new IOException("failed to put data to rocksdb table " + _path, e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org