You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/03 21:33:11 UTC
[1/2] beam git commit: Retry on correct error codes for datastoreio
Repository: beam
Updated Branches:
refs/heads/master f55d00253 -> 9b6b9060b
Retry on correct error codes for datastoreio
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8e2522e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8e2522e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8e2522e
Branch: refs/heads/master
Commit: d8e2522eb04a2a0b5cb28415e55d467d8905d841
Parents: f55d002
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed May 3 13:14:20 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed May 3 14:32:59 2017 -0700
----------------------------------------------------------------------
.../apache_beam/io/gcp/datastore/v1/helper.py | 16 ++++++++-----
.../io/gcp/datastore/v1/helper_test.py | 24 +++++++++++++++-----
2 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8e2522e/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index d544226..a61884f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -24,13 +24,13 @@ try:
from google.cloud.proto.datastore.v1 import datastore_pb2
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
+ from google.rpc import code_pb2
from googledatastore import PropertyFilter, CompositeFilter
from googledatastore import helper as datastore_helper
from googledatastore.connection import Datastore
from googledatastore.connection import RPCError
- QUERY_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED
except ImportError:
- QUERY_NOT_FINISHED = None
+ pass
# pylint: enable=wrong-import-order, wrong-import-position
from apache_beam.internal.gcp import auth
@@ -129,8 +129,12 @@ def make_partition(project, namespace):
def retry_on_rpc_error(exception):
"""A retry filter for Cloud Datastore RPCErrors."""
if isinstance(exception, RPCError):
- return exception.code >= 500
- # TODO(vikasrk): Figure out what other errors should be retried.
+ err_code = exception.code
+ # TODO(BEAM-2156): put these codes in a global list and use that instead.
+ return (err_code == code_pb2.DEADLINE_EXCEEDED or
+ err_code == code_pb2.UNAVAILABLE or
+ err_code == code_pb2.UNKNOWN or
+ err_code == code_pb2.INTERNAL)
return False
@@ -221,7 +225,6 @@ class QueryIterator(object):
Entities are read in batches. Retries on failures.
"""
- _NOT_FINISHED = QUERY_NOT_FINISHED
# Maximum number of results to request per query.
_BATCH_SIZE = 500
@@ -265,4 +268,5 @@ class QueryIterator(object):
# read).
more_results = ((self._limit > 0) and
((num_results == self._BATCH_SIZE) or
- (resp.batch.more_results == self._NOT_FINISHED)))
+ (resp.batch.more_results ==
+ query_pb2.QueryResultBatch.NOT_FINISHED)))
http://git-wip-us.apache.org/repos/asf/beam/blob/d8e2522e/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 582a5b3..5d4bb6f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -33,6 +33,7 @@ try:
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from google.cloud.proto.datastore.v1.entity_pb2 import Key
+ from google.rpc import code_pb2
from googledatastore.connection import RPCError
from googledatastore import helper as datastore_helper
except ImportError:
@@ -49,19 +50,22 @@ class HelperTest(unittest.TestCase):
self._query.kind.add().name = 'dummy_kind'
patch_retry(self, helper)
- def permanent_datastore_failure(self, req):
- raise RPCError("dummy", 500, "failed")
+ def permanent_retriable_datastore_failure(self, req):
+ raise RPCError("dummy", code_pb2.UNAVAILABLE, "failed")
- def transient_datastore_failure(self, req):
+ def transient_retriable_datastore_failure(self, req):
if self._transient_fail_count:
self._transient_fail_count -= 1
- raise RPCError("dummy", 500, "failed")
+ raise RPCError("dummy", code_pb2.INTERNAL, "failed")
else:
return datastore_pb2.RunQueryResponse()
+ def non_retriable_datastore_failure(self, req):
+ raise RPCError("dummy", code_pb2.UNAUTHENTICATED, "failed")
+
def test_query_iterator(self):
self._mock_datastore.run_query.side_effect = (
- self.permanent_datastore_failure)
+ self.permanent_retriable_datastore_failure)
query_iterator = helper.QueryIterator("project", None, self._query,
self._mock_datastore)
self.assertRaises(RPCError, iter(query_iterator).next)
@@ -69,7 +73,7 @@ class HelperTest(unittest.TestCase):
def test_query_iterator_with_transient_failures(self):
self._mock_datastore.run_query.side_effect = (
- self.transient_datastore_failure)
+ self.transient_retriable_datastore_failure)
query_iterator = helper.QueryIterator("project", None, self._query,
self._mock_datastore)
fail_count = 2
@@ -80,6 +84,14 @@ class HelperTest(unittest.TestCase):
self.assertEqual(fail_count + 1,
len(self._mock_datastore.run_query.call_args_list))
+ def test_query_iterator_with_non_retriable_failures(self):
+ self._mock_datastore.run_query.side_effect = (
+ self.non_retriable_datastore_failure)
+ query_iterator = helper.QueryIterator("project", None, self._query,
+ self._mock_datastore)
+ self.assertRaises(RPCError, iter(query_iterator).next)
+ self.assertEqual(1, len(self._mock_datastore.run_query.call_args_list))
+
def test_query_iterator_with_single_batch(self):
num_entities = 100
batch_size = 500
[2/2] beam git commit: This closes #2865
Posted by al...@apache.org.
This closes #2865
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b6b9060
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b6b9060
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b6b9060
Branch: refs/heads/master
Commit: 9b6b9060b1478073587eb24a134be56d93d69a57
Parents: f55d002 d8e2522
Author: Ahmet Altay <al...@google.com>
Authored: Wed May 3 14:33:01 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed May 3 14:33:01 2017 -0700
----------------------------------------------------------------------
.../apache_beam/io/gcp/datastore/v1/helper.py | 16 ++++++++-----
.../io/gcp/datastore/v1/helper_test.py | 24 +++++++++++++++-----
2 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------