You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/04 00:21:21 UTC
[1/3] incubator-beam git commit: Closes #1250
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk a6e104d6d -> 32d719911
Closes #1250
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32d71991
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32d71991
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32d71991
Branch: refs/heads/python-sdk
Commit: 32d719911f9e0dc44e1a34defdb37c6935f0b25d
Parents: a6e104d 456f926
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Nov 3 14:58:38 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Nov 3 14:58:38 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/bigquery.py | 23 ++++++++++++++-
sdks/python/apache_beam/io/bigquery_test.py | 37 +++++++++++++++++++++---
2 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Document the input and output of
type conversions
Posted by ro...@apache.org.
Document the input and output of type conversions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/456f9263
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/456f9263
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/456f9263
Branch: refs/heads/python-sdk
Commit: 456f9263f67c9fb504b633bf84b757ea1a48ee6e
Parents: d66cb17
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Nov 2 17:53:34 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Nov 3 14:58:38 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/bigquery.py | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/456f9263/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index b626432..9c1ee27 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -831,27 +831,36 @@ class BigQueryWrapper(object):
# return True for both!).
value = from_json_value(cell.v)
if field.type == 'STRING':
+ # Input: "XYZ" --> Output: "XYZ"
value = value
elif field.type == 'BOOLEAN':
+ # Input: "true" --> Output: True
value = value == 'true'
elif field.type == 'INTEGER':
+ # Input: "123" --> Output: 123
value = int(value)
elif field.type == 'FLOAT':
+ # Input: "1.23" --> Output: 1.23
value = float(value)
elif field.type == 'TIMESTAMP':
# The UTC should come from the timezone library but this is a known
# issue in python 2.7 so we'll just hardcode it as we're reading using
# utcfromtimestamp. This is just to match the output from the dataflow
# runner with the local runner.
+ # Input: 1478134176.985864 --> Output: "2016-11-03 00:49:36.985864 UTC"
dt = datetime.datetime.utcfromtimestamp(float(value))
value = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC')
elif field.type == 'BYTES':
+ # Input: "YmJi" --> Output: "YmJi"
value = value
elif field.type == 'DATE':
+ # Input: "2016-11-03" --> Output: "2016-11-03"
value = value
elif field.type == 'DATETIME':
+ # Input: "2016-11-03T00:49:36" --> Output: "2016-11-03T00:49:36"
value = value
elif field.type == 'TIME':
+ # Input: "00:49:36" --> Output: "00:49:36"
value = value
else:
# Note that a schema field object supports also a RECORD type. However
[3/3] incubator-beam git commit: Make the BQ input for dataflow
runner and local runner is identical for time related types
Posted by ro...@apache.org.
Make the BQ input for dataflow runner and local runner is identical for time related types
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d66cb174
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d66cb174
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d66cb174
Branch: refs/heads/python-sdk
Commit: d66cb174cd7a1ea54c48ae92a6528f187f36434e
Parents: a6e104d
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Nov 1 13:06:10 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Nov 3 14:58:38 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/bigquery.py | 14 ++++++++-
sdks/python/apache_beam/io/bigquery_test.py | 37 +++++++++++++++++++++---
2 files changed, 46 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d66cb174/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index a3999af..b626432 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -108,6 +108,7 @@ import json
import logging
import re
import time
+import datetime
import uuid
from apitools.base.py.exceptions import HttpError
@@ -838,9 +839,20 @@ class BigQueryWrapper(object):
elif field.type == 'FLOAT':
value = float(value)
elif field.type == 'TIMESTAMP':
- value = float(value)
+ # The UTC should come from the timezone library but this is a known
+ # issue in python 2.7 so we'll just hardcode it as we're reading using
+ # utcfromtimestamp. This is just to match the output from the dataflow
+ # runner with the local runner.
+ dt = datetime.datetime.utcfromtimestamp(float(value))
+ value = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC')
elif field.type == 'BYTES':
value = value
+ elif field.type == 'DATE':
+ value = value
+ elif field.type == 'DATETIME':
+ value = value
+ elif field.type == 'TIME':
+ value = value
else:
# Note that a schema field object supports also a RECORD type. However
# when querying, the repeated and/or record fields always come
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d66cb174/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py
index ac71880..86b599c 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -20,6 +20,7 @@
import json
import logging
import time
+import datetime
import unittest
from apitools.base.py.exceptions import HttpError
@@ -183,9 +184,25 @@ class TestBigQueryReader(unittest.TestCase):
def get_test_rows(self):
now = time.time()
+ dt = datetime.datetime.utcfromtimestamp(float(now))
+ ts = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC')
expected_rows = [
- {'i': 1, 's': 'abc', 'f': 2.3, 'b': True, 't': now},
- {'i': 10, 's': 'xyz', 'f': -3.14, 'b': False}]
+ {
+ 'i': 1,
+ 's': 'abc',
+ 'f': 2.3,
+ 'b': True,
+ 't': ts,
+ 'dt': '2016-10-31',
+ 'ts': '22:39:12.627498',
+ 'dt_ts': '2008-12-25T07:30:00'
+ },
+ {
+ 'i': 10,
+ 's': 'xyz',
+ 'f': -3.14,
+ 'b': False
+ }]
schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
@@ -197,7 +214,13 @@ class TestBigQueryReader(unittest.TestCase):
bigquery.TableFieldSchema(
name='s', type='STRING', mode='REQUIRED'),
bigquery.TableFieldSchema(
- name='t', type='TIMESTAMP', mode='NULLABLE')])
+ name='t', type='TIMESTAMP', mode='NULLABLE'),
+ bigquery.TableFieldSchema(
+ name='dt', type='DATE', mode='NULLABLE'),
+ bigquery.TableFieldSchema(
+ name='ts', type='TIME', mode='NULLABLE'),
+ bigquery.TableFieldSchema(
+ name='dt_ts', type='DATETIME', mode='NULLABLE')])
table_rows = [
bigquery.TableRow(f=[
bigquery.TableCell(v=to_json_value('true')),
@@ -206,12 +229,18 @@ class TestBigQueryReader(unittest.TestCase):
bigquery.TableCell(v=to_json_value('abc')),
# For timestamps cannot use str() because it will truncate the
# number representing the timestamp.
- bigquery.TableCell(v=to_json_value('%f' % now))]),
+ bigquery.TableCell(v=to_json_value('%f' % now)),
+ bigquery.TableCell(v=to_json_value('2016-10-31')),
+ bigquery.TableCell(v=to_json_value('22:39:12.627498')),
+ bigquery.TableCell(v=to_json_value('2008-12-25T07:30:00'))]),
bigquery.TableRow(f=[
bigquery.TableCell(v=to_json_value('false')),
bigquery.TableCell(v=to_json_value(str(-3.14))),
bigquery.TableCell(v=to_json_value(str(10))),
bigquery.TableCell(v=to_json_value('xyz')),
+ bigquery.TableCell(v=None),
+ bigquery.TableCell(v=None),
+ bigquery.TableCell(v=None),
bigquery.TableCell(v=None)])]
return table_rows, schema, expected_rows