You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/01 20:05:20 UTC
[1/2] beam git commit: Updates Python datastore wordcount example to
take a dataset parameter.
Repository: beam
Updated Branches:
refs/heads/master b013d7c5a -> 8247b5346
Updates Python datastore wordcount example to take a dataset parameter.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac436349
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac436349
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac436349
Branch: refs/heads/master
Commit: ac4363491e6c300cf34c05bf547cee3ccc37c98e
Parents: b013d7c
Author: chamikara@google.com <ch...@google.com>
Authored: Tue Oct 31 18:37:29 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Nov 1 13:03:37 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/datastore_wordcount.py | 24 +++++++++++---------
.../io/gcp/datastore/v1/datastoreio.py | 16 +++++++++++--
2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ac436349/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 099fb08..7204e3b 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -32,12 +32,14 @@ counts them and write the output to a set of files.
The following options must be provided to run this pipeline in read-only mode:
``
---project YOUR_PROJECT_ID
+--dataset YOUR_DATASET
--kind YOUR_DATASTORE_KIND
--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
--read_only
``
+Dataset maps to Project ID for v1 version of datastore.
+
Read-write Mode: In this mode, this example reads words from an input file,
converts them to Cloud Datastore ``Entity`` objects and writes them to
Cloud Datastore using the ``datastoreio.Write`` transform. The second pipeline
@@ -47,7 +49,7 @@ write the output to a set of files.
The following options must be provided to run this pipeline in read-write mode:
``
---project YOUR_PROJECT_ID
+--dataset YOUR_DATASET
--kind YOUR_DATASTORE_KIND
--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
``
@@ -77,7 +79,6 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
-from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
@@ -134,7 +135,7 @@ class EntityWrapper(object):
return entity
-def write_to_datastore(project, user_options, pipeline_options):
+def write_to_datastore(user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
with beam.Pipeline(options=pipeline_options) as p:
@@ -144,7 +145,7 @@ def write_to_datastore(project, user_options, pipeline_options):
| 'create entity' >> beam.Map(
EntityWrapper(user_options.namespace, user_options.kind,
user_options.ancestor).make_entity)
- | 'write to datastore' >> WriteToDatastore(project))
+ | 'write to datastore' >> WriteToDatastore(user_options.dataset))
def make_ancestor_query(kind, namespace, ancestor):
@@ -167,7 +168,7 @@ def make_ancestor_query(kind, namespace, ancestor):
return query
-def read_from_datastore(project, user_options, pipeline_options):
+def read_from_datastore(user_options, pipeline_options):
"""Creates a pipeline that reads entities from Cloud Datastore."""
p = beam.Pipeline(options=pipeline_options)
# Create a query to read entities from datastore.
@@ -176,7 +177,7 @@ def read_from_datastore(project, user_options, pipeline_options):
# Read entities from Cloud Datastore into a PCollection.
lines = p | 'read from datastore' >> ReadFromDatastore(
- project, query, user_options.namespace)
+ user_options.dataset, query, user_options.namespace)
# Count the occurrences of each word.
def count_ones(word_ones):
@@ -216,6 +217,9 @@ def run(argv=None):
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
+ parser.add_argument('--dataset',
+ dest='dataset',
+ help='Dataset ID to read from Cloud Datastore.')
parser.add_argument('--kind',
dest='kind',
required=True,
@@ -246,15 +250,13 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- gcloud_options = pipeline_options.view_as(GoogleCloudOptions)
# Write to Datastore if `read_only` options is not specified.
if not known_args.read_only:
- write_to_datastore(gcloud_options.project, known_args, pipeline_options)
+ write_to_datastore(known_args, pipeline_options)
# Read entities from Datastore.
- result = read_from_datastore(gcloud_options.project, known_args,
- pipeline_options)
+ result = read_from_datastore(known_args, pipeline_options)
empty_lines_filter = MetricsFilter().with_name('empty_lines')
query_result = result.metrics().query(empty_lines_filter)
http://git-wip-us.apache.org/repos/asf/beam/blob/ac436349/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index 078002c..13209c1 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -89,10 +89,10 @@ class ReadFromDatastore(PTransform):
_DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024
def __init__(self, project, query, namespace=None, num_splits=0):
- """Initialize the ReadFromDatastore transform.
+ """Initialize the `ReadFromDatastore` transform.
Args:
- project: The Project ID
+ project: The ID of the project to read from.
query: Cloud Datastore query to be read from.
namespace: An optional namespace.
num_splits: Number of splits for the query.
@@ -459,7 +459,13 @@ class _Mutate(PTransform):
class WriteToDatastore(_Mutate):
"""A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore."""
+
def __init__(self, project):
+ """Initialize the `WriteToDatastore` transform.
+
+ Args:
+ project: The ID of the project to write to.
+ """
# Import here to avoid adding the dependency for local running scenarios.
try:
@@ -486,6 +492,12 @@ class WriteToDatastore(_Mutate):
class DeleteFromDatastore(_Mutate):
"""A ``PTransform`` to delete a ``PCollection[Key]`` from Cloud Datastore."""
def __init__(self, project):
+ """Initialize the `DeleteFromDatastore` transform.
+
+ Args:
+ project: The ID of the project from which the entities will be deleted.
+ """
+
super(DeleteFromDatastore, self).__init__(
project, DeleteFromDatastore.to_delete_mutation)
[2/2] beam git commit: This closes #4067
Posted by ch...@apache.org.
This closes #4067
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8247b534
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8247b534
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8247b534
Branch: refs/heads/master
Commit: 8247b534613eabed7fff4b62c6afc6603b84031c
Parents: b013d7c ac43634
Author: chamikara@google.com <ch...@google.com>
Authored: Wed Nov 1 13:04:58 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Nov 1 13:04:58 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/datastore_wordcount.py | 24 +++++++++++---------
.../io/gcp/datastore/v1/datastoreio.py | 16 +++++++++++--
2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------