You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:54 UTC
[27/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
Revert "[BEAM-2610] This closes #3553"
This reverts commit ec494f675aa73fbdc7929f9592f33951941962b0, reversing
changes made to d89d1ee1a3085269cdf44ec50e29a95c8f43757b.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1b2b96a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1b2b96a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1b2b96a
Branch: refs/heads/DSL_SQL
Commit: c1b2b96a438b86a8b0023f6943dcf0a4f238ba39
Parents: 97a156c
Author: mingmxu <mi...@ebay.com>
Authored: Wed Jul 19 14:26:11 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Wed Jul 19 14:26:11 2017 -0700
----------------------------------------------------------------------
.gitignore | 2 +-
.../jenkins/common_job_properties.groovy | 9 +-
.../job_beam_PerformanceTests_Python.groovy | 58 --
..._beam_PostCommit_Java_JDKVersionsTest.groovy | 2 -
..._PostCommit_Java_MavenInstall_Windows.groovy | 3 +-
.../job_beam_PreCommit_Website_Merge.groovy | 59 --
examples/java/pom.xml | 20 +-
.../examples/common/WriteOneFilePerWindow.java | 52 +-
.../beam/examples/WindowedWordCountIT.java | 4 +-
examples/java8/pom.xml | 20 +-
.../complete/game/utils/WriteToText.java | 43 +-
.../examples/complete/game/LeaderBoardTest.java | 2 -
examples/pom.xml | 2 +-
pom.xml | 123 +---
runners/apex/pom.xml | 20 +-
.../apache/beam/runners/apex/ApexRunner.java | 61 +-
.../translation/ApexPipelineTranslator.java | 16 +-
.../apex/translation/TranslationContext.java | 4 +-
.../operators/ApexParDoOperator.java | 21 +-
.../runners/apex/examples/WordCountTest.java | 8 +-
.../utils/ApexStateInternalsTest.java | 411 ++++++++---
runners/core-construction-java/pom.xml | 2 +-
.../CreatePCollectionViewTranslation.java | 4 +-
.../construction/ElementAndRestriction.java | 42 ++
.../ElementAndRestrictionCoder.java | 88 +++
.../construction/PCollectionTranslation.java | 16 -
.../core/construction/PTransformMatchers.java | 109 +--
.../construction/PTransformTranslation.java | 7 +-
.../core/construction/ParDoTranslation.java | 82 +--
.../construction/RunnerPCollectionView.java | 31 +-
.../core/construction/SplittableParDo.java | 124 +---
.../construction/TestStreamTranslation.java | 49 +-
.../core/construction/TransformInputs.java | 50 --
.../WindowingStrategyTranslation.java | 27 +-
.../construction/WriteFilesTranslation.java | 67 +-
.../ElementAndRestrictionCoderTest.java | 126 ++++
.../PCollectionTranslationTest.java | 22 -
.../construction/PTransformMatchersTest.java | 54 +-
.../core/construction/ParDoTranslationTest.java | 28 +-
.../core/construction/SplittableParDoTest.java | 18 +-
.../core/construction/TransformInputsTest.java | 166 -----
.../WindowingStrategyTranslationTest.java | 3 -
.../construction/WriteFilesTranslationTest.java | 62 +-
runners/core-java/pom.xml | 2 +-
.../core/LateDataDroppingDoFnRunner.java | 33 +-
...eBoundedSplittableProcessElementInvoker.java | 40 +-
.../beam/runners/core/ProcessFnRunner.java | 16 +-
.../beam/runners/core/ReduceFnRunner.java | 135 ++--
.../beam/runners/core/SimpleDoFnRunner.java | 20 -
.../core/SplittableParDoViaKeyedWorkItems.java | 58 +-
.../core/SplittableProcessElementInvoker.java | 25 +-
.../beam/runners/core/SystemReduceFn.java | 6 -
.../core/triggers/AfterAllStateMachine.java | 25 +-
.../AfterDelayFromFirstElementStateMachine.java | 6 +-
.../core/triggers/AfterFirstStateMachine.java | 20 +-
.../core/triggers/AfterPaneStateMachine.java | 6 +-
.../triggers/AfterWatermarkStateMachine.java | 7 +-
.../triggers/ExecutableTriggerStateMachine.java | 23 +-
.../core/triggers/NeverStateMachine.java | 5 +-
.../core/triggers/TriggerStateMachine.java | 27 +
.../core/InMemoryStateInternalsTest.java | 569 +++++++++++++--
...ndedSplittableProcessElementInvokerTest.java | 47 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 374 +---------
.../beam/runners/core/ReduceFnTester.java | 48 +-
.../core/SplittableParDoProcessFnTest.java | 117 +--
.../beam/runners/core/StateInternalsTest.java | 613 ----------------
.../beam/runners/core/WindowMatchers.java | 15 -
.../triggers/AfterFirstStateMachineTest.java | 5 +-
.../AfterWatermarkStateMachineTest.java | 7 +-
.../core/triggers/StubTriggerStateMachine.java | 7 +-
runners/direct-java/pom.xml | 7 +-
.../beam/runners/direct/CommittedResult.java | 12 +-
.../apache/beam/runners/direct/DirectGraph.java | 38 +-
.../beam/runners/direct/DirectGraphVisitor.java | 48 +-
.../beam/runners/direct/DirectGroupByKey.java | 13 +-
.../direct/DirectGroupByKeyOverrideFactory.java | 14 +-
.../beam/runners/direct/DirectRegistrar.java | 2 +-
.../beam/runners/direct/DirectRunner.java | 64 +-
.../beam/runners/direct/DirectTestOptions.java | 42 --
.../beam/runners/direct/EvaluationContext.java | 26 +-
.../direct/ExecutorServiceParallelExecutor.java | 27 +-
.../runners/direct/ParDoEvaluatorFactory.java | 9 +-
.../direct/ParDoMultiOverrideFactory.java | 121 +---
...littableProcessElementsEvaluatorFactory.java | 37 +-
.../direct/StatefulParDoEvaluatorFactory.java | 12 +-
.../direct/TestStreamEvaluatorFactory.java | 20 +-
.../runners/direct/ViewEvaluatorFactory.java | 8 +-
.../runners/direct/ViewOverrideFactory.java | 69 +-
.../beam/runners/direct/WatermarkManager.java | 18 +-
.../direct/WriteWithShardingFactory.java | 34 +-
.../runners/direct/CommittedResultTest.java | 17 +-
.../runners/direct/DirectGraphVisitorTest.java | 10 +-
.../beam/runners/direct/DirectGraphs.java | 7 -
.../runners/direct/DirectRegistrarTest.java | 2 +-
.../runners/direct/EvaluationContextTest.java | 7 +-
.../ImmutabilityEnforcementFactoryTest.java | 4 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 7 +-
.../StatefulParDoEvaluatorFactoryTest.java | 65 +-
.../runners/direct/TransformExecutorTest.java | 12 +-
.../direct/ViewEvaluatorFactoryTest.java | 8 +-
.../runners/direct/ViewOverrideFactoryTest.java | 37 +-
.../direct/WatermarkCallbackExecutorTest.java | 1 -
.../runners/direct/WatermarkManagerTest.java | 16 +-
.../direct/WriteWithShardingFactoryTest.java | 44 +-
runners/flink/pom.xml | 11 +-
.../runners/flink/CreateStreamingFlinkView.java | 154 ----
.../flink/FlinkBatchTranslationContext.java | 3 +-
.../FlinkPipelineExecutionEnvironment.java | 2 -
.../flink/FlinkStreamingPipelineTranslator.java | 86 ++-
.../FlinkStreamingTransformTranslators.java | 36 +-
.../flink/FlinkStreamingTranslationContext.java | 3 +-
.../flink/FlinkStreamingViewOverrides.java | 372 ++++++++++
.../runners/flink/FlinkTransformOverrides.java | 53 --
.../streaming/SplittableDoFnOperator.java | 16 +-
.../streaming/state/FlinkStateInternals.java | 425 ++++++-----
.../FlinkBroadcastStateInternalsTest.java | 242 +++++--
.../FlinkKeyGroupStateInternalsTest.java | 359 +++++-----
.../streaming/FlinkSplitStateInternalsTest.java | 132 ++--
.../streaming/FlinkStateInternalsTest.java | 343 ++++++++-
runners/google-cloud-dataflow-java/pom.xml | 10 +-
.../dataflow/BatchStatefulParDoOverrides.java | 4 -
.../runners/dataflow/BatchViewOverrides.java | 182 +++--
.../runners/dataflow/CreateDataflowView.java | 8 +-
.../dataflow/DataflowPipelineTranslator.java | 62 +-
.../beam/runners/dataflow/DataflowRunner.java | 133 +---
.../dataflow/SplittableParDoOverrides.java | 76 --
.../dataflow/StreamingViewOverrides.java | 10 +-
.../runners/dataflow/TransformTranslator.java | 4 +-
.../runners/dataflow/util/PropertyNames.java | 1 -
.../beam/runners/dataflow/util/TimeUtil.java | 24 +-
.../DataflowPipelineTranslatorTest.java | 95 +--
.../runners/dataflow/DataflowRunnerTest.java | 198 +-----
.../runners/dataflow/util/TimeUtilTest.java | 6 -
runners/pom.xml | 2 +-
runners/spark/pom.xml | 70 +-
.../spark/SparkNativePipelineVisitor.java | 3 +-
.../apache/beam/runners/spark/SparkRunner.java | 9 +-
.../beam/runners/spark/TestSparkRunner.java | 2 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 6 +-
.../spark/stateful/SparkTimerInternals.java | 18 +-
.../spark/translation/EvaluationContext.java | 4 +-
.../spark/translation/TransformTranslator.java | 50 +-
.../spark/util/GlobalWatermarkHolder.java | 127 +---
.../spark/GlobalWatermarkHolderTest.java | 18 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 26 +-
.../spark/stateful/SparkStateInternalsTest.java | 66 --
.../spark/translation/StorageLevelTest.java | 4 +-
sdks/common/fn-api/pom.xml | 2 +-
.../fn-api/src/main/proto/beam_fn_api.proto | 237 ++++--
sdks/common/pom.xml | 2 +-
sdks/common/runner-api/pom.xml | 2 +-
.../src/main/proto/beam_runner_api.proto | 26 +-
sdks/java/build-tools/pom.xml | 2 +-
.../src/main/resources/beam/findbugs-filter.xml | 9 -
sdks/java/core/pom.xml | 2 +-
.../apache/beam/sdk/coders/ShardedKeyCoder.java | 66 --
.../java/org/apache/beam/sdk/io/AvroIO.java | 220 +++---
.../java/org/apache/beam/sdk/io/AvroSink.java | 32 +-
.../apache/beam/sdk/io/CompressedSource.java | 40 +-
.../beam/sdk/io/DefaultFilenamePolicy.java | 274 ++-----
.../beam/sdk/io/DynamicFileDestinations.java | 115 ---
.../org/apache/beam/sdk/io/FileBasedSink.java | 513 ++++++-------
.../apache/beam/sdk/io/OffsetBasedSource.java | 22 +-
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 44 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 712 ++++---------------
.../java/org/apache/beam/sdk/io/TextSink.java | 22 +-
.../java/org/apache/beam/sdk/io/WriteFiles.java | 647 ++++++-----------
.../beam/sdk/io/range/ByteKeyRangeTracker.java | 22 +-
.../apache/beam/sdk/io/range/OffsetRange.java | 101 ---
.../beam/sdk/io/range/OffsetRangeTracker.java | 3 -
.../sdk/options/PipelineOptionsFactory.java | 18 +-
.../sdk/options/PipelineOptionsValidator.java | 34 +-
.../sdk/options/ProxyInvocationHandler.java | 19 +-
.../beam/sdk/runners/TransformHierarchy.java | 165 +----
.../apache/beam/sdk/testing/StaticWindows.java | 5 -
.../org/apache/beam/sdk/testing/TestStream.java | 12 -
.../org/apache/beam/sdk/transforms/Combine.java | 30 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 52 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 21 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 41 +-
.../sdk/transforms/SerializableFunctions.java | 50 --
.../org/apache/beam/sdk/transforms/View.java | 38 +-
.../reflect/ByteBuddyDoFnInvokerFactory.java | 27 -
.../reflect/ByteBuddyOnTimerInvokerFactory.java | 73 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 17 +-
.../sdk/transforms/reflect/DoFnSignature.java | 33 +-
.../sdk/transforms/reflect/DoFnSignatures.java | 44 +-
.../reflect/OnTimerMethodSpecifier.java | 37 -
.../transforms/splittabledofn/OffsetRange.java | 77 ++
.../splittabledofn/OffsetRangeTracker.java | 11 -
.../splittabledofn/RestrictionTracker.java | 11 +-
.../sdk/transforms/windowing/GlobalWindows.java | 5 -
.../windowing/PartitioningWindowFn.java | 5 -
.../transforms/windowing/SlidingWindows.java | 5 -
.../beam/sdk/transforms/windowing/Window.java | 32 -
.../beam/sdk/transforms/windowing/WindowFn.java | 11 -
.../apache/beam/sdk/util/IdentityWindowFn.java | 5 -
.../org/apache/beam/sdk/values/PCollection.java | 12 -
.../beam/sdk/values/PCollectionViews.java | 38 -
.../org/apache/beam/sdk/values/PValueBase.java | 12 +
.../org/apache/beam/sdk/values/ShardedKey.java | 65 --
.../beam/sdk/values/WindowingStrategy.java | 46 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 85 +--
.../beam/sdk/io/DefaultFilenamePolicyTest.java | 135 ++--
.../sdk/io/DrunkWritableByteChannelFactory.java | 2 +-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 93 +--
.../java/org/apache/beam/sdk/io/SimpleSink.java | 56 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 326 +--------
.../org/apache/beam/sdk/io/WriteFilesTest.java | 366 ++--------
.../options/PipelineOptionsValidatorTest.java | 44 --
.../sdk/options/ProxyInvocationHandlerTest.java | 19 -
.../sdk/runners/TransformHierarchyTest.java | 197 -----
.../sdk/testing/PCollectionViewTesting.java | 8 -
.../apache/beam/sdk/transforms/CombineTest.java | 365 ++++------
.../beam/sdk/transforms/DoFnTesterTest.java | 32 -
.../beam/sdk/transforms/GroupByKeyTest.java | 39 -
.../apache/beam/sdk/transforms/ParDoTest.java | 165 -----
.../beam/sdk/transforms/SplittableDoFnTest.java | 155 +---
.../transforms/reflect/DoFnInvokersTest.java | 93 +--
.../DoFnSignaturesProcessElementTest.java | 2 +-
.../DoFnSignaturesSplittableDoFnTest.java | 83 +--
.../transforms/reflect/DoFnSignaturesTest.java | 14 -
.../splittabledofn/OffsetRangeTrackerTest.java | 1 -
.../windowing/SlidingWindowsTest.java | 30 +-
.../google-cloud-platform-core/pom.xml | 2 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 2 +-
.../sdk/util/RetryHttpRequestInitializer.java | 147 ++--
.../extensions/gcp/GcpCoreApiSurfaceTest.java | 48 +-
.../util/RetryHttpRequestInitializerTest.java | 31 +-
sdks/java/extensions/jackson/pom.xml | 2 +-
sdks/java/extensions/join-library/pom.xml | 2 +-
sdks/java/extensions/pom.xml | 2 +-
sdks/java/extensions/protobuf/pom.xml | 2 +-
sdks/java/extensions/sorter/pom.xml | 8 +-
sdks/java/harness/pom.xml | 18 +-
.../harness/control/ProcessBundleHandler.java | 295 ++++++--
.../fn/harness/control/RegisterHandler.java | 2 +-
.../beam/runners/core/BeamFnDataReadRunner.java | 70 +-
.../runners/core/BeamFnDataWriteRunner.java | 67 +-
.../beam/runners/core/BoundedSourceRunner.java | 74 +-
.../beam/runners/core/FnApiDoFnRunner.java | 547 --------------
.../runners/core/PTransformRunnerFactory.java | 81 ---
.../control/ProcessBundleHandlerTest.java | 521 ++++++++++++--
.../fn/harness/control/RegisterHandlerTest.java | 8 +-
.../runners/core/BeamFnDataReadRunnerTest.java | 112 +--
.../runners/core/BeamFnDataWriteRunnerTest.java | 120 +---
.../runners/core/BoundedSourceRunnerTest.java | 124 +---
.../beam/runners/core/FnApiDoFnRunnerTest.java | 210 ------
sdks/java/io/amqp/pom.xml | 100 ---
.../org/apache/beam/sdk/io/amqp/AmqpIO.java | 399 -----------
.../beam/sdk/io/amqp/AmqpMessageCoder.java | 79 --
.../amqp/AmqpMessageCoderProviderRegistrar.java | 44 --
.../apache/beam/sdk/io/amqp/package-info.java | 22 -
.../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 148 ----
.../beam/sdk/io/amqp/AmqpMessageCoderTest.java | 89 ---
sdks/java/io/cassandra/pom.xml | 2 +-
.../beam/sdk/io/cassandra/CassandraIO.java | 2 +-
sdks/java/io/common/pom.xml | 2 +-
.../sdk/io/common/IOTestPipelineOptions.java | 6 +-
sdks/java/io/elasticsearch/pom.xml | 10 +-
.../sdk/io/elasticsearch/ElasticsearchIO.java | 17 +-
.../elasticsearch/ElasticSearchIOTestUtils.java | 81 +--
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 14 +-
.../io/elasticsearch/ElasticsearchIOTest.java | 36 +-
.../elasticsearch/ElasticsearchTestDataSet.java | 37 +-
sdks/java/io/google-cloud-platform/pom.xml | 14 +-
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 -
.../io/gcp/bigquery/DynamicDestinations.java | 29 +-
.../io/gcp/bigquery/GenerateShardedTable.java | 1 -
.../beam/sdk/io/gcp/bigquery/ShardedKey.java | 67 ++
.../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 74 ++
.../sdk/io/gcp/bigquery/StreamingWriteFn.java | 1 -
.../io/gcp/bigquery/StreamingWriteTables.java | 2 -
.../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 1 -
.../io/gcp/bigquery/WriteBundlesToFiles.java | 2 -
.../bigquery/WriteGroupedRecordsToFiles.java | 1 -
.../sdk/io/gcp/bigquery/WritePartition.java | 1 -
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 1 -
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 8 +-
.../io/gcp/bigtable/BigtableServiceImpl.java | 9 +-
.../sdk/io/gcp/datastore/AdaptiveThrottler.java | 103 ---
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 149 +---
.../sdk/io/gcp/datastore/MovingAverage.java | 50 --
.../sdk/io/gcp/spanner/AbstractSpannerFn.java | 58 --
.../sdk/io/gcp/spanner/CreateTransactionFn.java | 51 --
.../beam/sdk/io/gcp/spanner/MutationGroup.java | 67 --
.../io/gcp/spanner/MutationSizeEstimator.java | 9 -
.../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 65 --
.../beam/sdk/io/gcp/spanner/SpannerConfig.java | 137 ----
.../beam/sdk/io/gcp/spanner/SpannerIO.java | 616 +++++-----------
.../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 125 ----
.../beam/sdk/io/gcp/spanner/Transaction.java | 33 -
.../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 10 -
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 -
.../sdk/io/gcp/bigtable/BigtableReadIT.java | 5 +-
.../io/gcp/bigtable/BigtableTestOptions.java | 5 +
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +-
.../io/gcp/datastore/AdaptiveThrottlerTest.java | 111 ---
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 92 +--
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 2 +-
.../sdk/io/gcp/spanner/FakeServiceFactory.java | 82 ---
.../gcp/spanner/MutationSizeEstimatorTest.java | 12 -
.../beam/sdk/io/gcp/spanner/RandomUtils.java | 41 --
.../sdk/io/gcp/spanner/SpannerIOReadTest.java | 281 --------
.../beam/sdk/io/gcp/spanner/SpannerIOTest.java | 244 +++++++
.../sdk/io/gcp/spanner/SpannerIOWriteTest.java | 258 -------
.../beam/sdk/io/gcp/spanner/SpannerReadIT.java | 166 -----
.../beam/sdk/io/gcp/spanner/SpannerWriteIT.java | 26 +-
sdks/java/io/hadoop-common/pom.xml | 2 +-
sdks/java/io/hadoop-file-system/pom.xml | 33 +-
sdks/java/io/hadoop/input-format/pom.xml | 2 +-
.../hadoop/inputformat/HadoopInputFormatIO.java | 2 +-
sdks/java/io/hadoop/jdk1.8-tests/pom.xml | 4 +-
.../inputformat/HIFIOWithElasticTest.java | 11 +-
sdks/java/io/hadoop/pom.xml | 2 +-
sdks/java/io/hbase/pom.xml | 26 +-
.../io/hbase/HBaseCoderProviderRegistrar.java | 40 --
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 48 +-
.../beam/sdk/io/hbase/HBaseMutationCoder.java | 42 --
.../hbase/HBaseCoderProviderRegistrarTest.java | 45 --
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 49 +-
sdks/java/io/hcatalog/pom.xml | 175 -----
.../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 492 -------------
.../beam/sdk/io/hcatalog/package-info.java | 22 -
.../io/hcatalog/EmbeddedMetastoreService.java | 87 ---
.../beam/sdk/io/hcatalog/HCatalogIOTest.java | 277 --------
.../sdk/io/hcatalog/HCatalogIOTestUtils.java | 108 ---
.../hcatalog/src/test/resources/hive-site.xml | 301 --------
sdks/java/io/jdbc/pom.xml | 4 +-
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 2 +-
sdks/java/io/jms/pom.xml | 2 +-
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +-
sdks/java/io/kafka/pom.xml | 2 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 132 ++--
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 30 -
sdks/java/io/kinesis/pom.xml | 2 +-
.../sdk/io/kinesis/CheckpointGenerator.java | 6 +-
.../beam/sdk/io/kinesis/CustomOptional.java | 111 ++-
.../io/kinesis/DynamicCheckpointGenerator.java | 52 +-
.../sdk/io/kinesis/GetKinesisRecordsResult.java | 49 +-
.../sdk/io/kinesis/KinesisClientProvider.java | 4 +-
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 281 ++++----
.../beam/sdk/io/kinesis/KinesisReader.java | 206 +++---
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 97 ++-
.../beam/sdk/io/kinesis/KinesisRecord.java | 177 +++--
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 68 +-
.../beam/sdk/io/kinesis/KinesisSource.java | 147 ++--
.../beam/sdk/io/kinesis/RecordFilter.java | 18 +-
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 37 +-
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 241 ++++---
.../sdk/io/kinesis/ShardRecordsIterator.java | 106 ++-
.../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 +++---
.../beam/sdk/io/kinesis/StartingPoint.java | 84 ++-
.../io/kinesis/StaticCheckpointGenerator.java | 27 +-
.../io/kinesis/TransientKinesisException.java | 7 +-
.../beam/sdk/io/kinesis/AmazonKinesisMock.java | 539 +++++++-------
.../beam/sdk/io/kinesis/CustomOptionalTest.java | 27 +-
.../kinesis/DynamicCheckpointGeneratorTest.java | 33 +-
.../sdk/io/kinesis/KinesisMockReadTest.java | 97 ++-
.../io/kinesis/KinesisReaderCheckpointTest.java | 52 +-
.../beam/sdk/io/kinesis/KinesisReaderIT.java | 127 ++--
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 166 +++--
.../sdk/io/kinesis/KinesisRecordCoderTest.java | 34 +-
.../beam/sdk/io/kinesis/KinesisTestOptions.java | 43 +-
.../beam/sdk/io/kinesis/KinesisUploader.java | 70 +-
.../beam/sdk/io/kinesis/RecordFilterTest.java | 52 +-
.../beam/sdk/io/kinesis/RoundRobinTest.java | 42 +-
.../sdk/io/kinesis/ShardCheckpointTest.java | 203 +++---
.../io/kinesis/ShardRecordsIteratorTest.java | 216 +++---
.../io/kinesis/SimplifiedKinesisClientTest.java | 351 +++++----
sdks/java/io/mongodb/pom.xml | 2 +-
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 2 +-
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 317 ++-------
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 37 -
sdks/java/io/mqtt/pom.xml | 2 +-
.../org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +-
sdks/java/io/pom.xml | 35 +-
sdks/java/io/xml/pom.xml | 2 +-
.../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 21 +-
.../org/apache/beam/sdk/io/xml/XmlSinkTest.java | 4 +-
sdks/java/java8tests/pom.xml | 2 +-
sdks/java/javadoc/pom.xml | 19 +-
.../maven-archetypes/examples-java8/pom.xml | 2 +-
.../main/resources/archetype-resources/pom.xml | 1 +
sdks/java/maven-archetypes/examples/pom.xml | 2 +-
.../main/resources/archetype-resources/pom.xml | 1 +
sdks/java/maven-archetypes/pom.xml | 2 +-
sdks/java/maven-archetypes/starter/pom.xml | 2 +-
.../resources/projects/basic/reference/pom.xml | 2 +-
sdks/java/pom.xml | 2 +-
sdks/pom.xml | 2 +-
sdks/python/apache_beam/coders/coder_impl.py | 4 -
sdks/python/apache_beam/coders/coders.py | 7 +-
.../apache_beam/coders/coders_test_common.py | 8 -
.../examples/snippets/snippets_test.py | 16 -
.../apache_beam/examples/streaming_wordcount.py | 25 +-
.../apache_beam/examples/windowed_wordcount.py | 93 ---
sdks/python/apache_beam/io/filesystem.py | 22 +-
sdks/python/apache_beam/io/gcp/gcsio.py | 10 +-
sdks/python/apache_beam/io/gcp/pubsub.py | 180 ++---
sdks/python/apache_beam/io/gcp/pubsub_test.py | 101 +--
.../io/gcp/tests/bigquery_matcher.py | 6 +-
.../io/gcp/tests/bigquery_matcher_test.py | 2 +-
sdks/python/apache_beam/io/range_trackers.py | 130 ++++
.../apache_beam/io/range_trackers_test.py | 186 +++++
.../apache_beam/options/pipeline_options.py | 35 +-
.../options/pipeline_options_test.py | 39 +-
.../apache_beam/options/value_provider_test.py | 93 ++-
sdks/python/apache_beam/pipeline.py | 230 +-----
sdks/python/apache_beam/pipeline_test.py | 53 --
sdks/python/apache_beam/portability/__init__.py | 18 -
.../apache_beam/portability/api/__init__.py | 21 -
sdks/python/apache_beam/pvalue.py | 2 +-
sdks/python/apache_beam/runners/api/__init__.py | 21 +
.../runners/dataflow/dataflow_runner.py | 91 +--
.../runners/dataflow/dataflow_runner_test.py | 24 +-
.../runners/dataflow/internal/apiclient.py | 35 +-
.../runners/dataflow/internal/apiclient_test.py | 29 +-
.../runners/dataflow/internal/dependency.py | 69 +-
.../runners/dataflow/native_io/iobase_test.py | 39 +-
.../dataflow/native_io/streaming_create.py | 72 --
.../runners/dataflow/ptransform_overrides.py | 52 --
.../runners/direct/bundle_factory.py | 2 +-
.../apache_beam/runners/direct/direct_runner.py | 108 ---
.../runners/direct/evaluation_context.py | 73 +-
.../apache_beam/runners/direct/executor.py | 135 ++--
.../runners/direct/transform_evaluator.py | 447 +-----------
.../runners/direct/transform_result.py | 41 ++
sdks/python/apache_beam/runners/direct/util.py | 67 --
.../runners/direct/watermark_manager.py | 100 +--
.../apache_beam/runners/pipeline_context.py | 19 +-
.../runners/portability/fn_api_runner.py | 306 ++++----
.../runners/portability/fn_api_runner_test.py | 31 +-
.../runners/worker/bundle_processor.py | 426 -----------
.../apache_beam/runners/worker/data_plane.py | 28 +-
.../runners/worker/data_plane_test.py | 2 +-
.../apache_beam/runners/worker/log_handler.py | 2 +-
.../runners/worker/log_handler_test.py | 2 +-
.../runners/worker/operation_specs.py | 9 +-
.../apache_beam/runners/worker/operations.py | 1 -
.../apache_beam/runners/worker/sdk_worker.py | 370 +++++++++-
.../runners/worker/sdk_worker_main.py | 2 +-
.../runners/worker/sdk_worker_test.py | 95 ++-
sdks/python/apache_beam/testing/test_stream.py | 5 -
.../apache_beam/testing/test_stream_test.py | 68 --
sdks/python/apache_beam/transforms/combiners.py | 8 -
.../apache_beam/transforms/combiners_test.py | 7 +-
sdks/python/apache_beam/transforms/core.py | 102 ++-
.../python/apache_beam/transforms/ptransform.py | 43 +-
sdks/python/apache_beam/transforms/trigger.py | 28 +-
sdks/python/apache_beam/transforms/window.py | 4 +-
.../apache_beam/typehints/trivial_inference.py | 3 +-
.../typehints/trivial_inference_test.py | 7 -
sdks/python/apache_beam/utils/plugin.py | 42 --
sdks/python/apache_beam/utils/timestamp.py | 5 -
sdks/python/apache_beam/utils/urns.py | 2 +-
sdks/python/apache_beam/version.py | 2 +-
sdks/python/gen_protos.py | 2 +-
sdks/python/pom.xml | 2 +-
sdks/python/run_pylint.sh | 2 +-
sdks/python/setup.py | 5 +-
462 files changed, 10754 insertions(+), 21718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 36c5cc8..bd419a7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,7 +25,7 @@ sdks/python/**/*.egg
sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
-sdks/python/apache_beam/portability/api/*pb2*.*
+sdks/python/apache_beam/runners/api/*pb2*.*
# Ignore IntelliJ files.
.idea/
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/common_job_properties.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/common_job_properties.groovy b/.test-infra/jenkins/common_job_properties.groovy
index 70534c6..6d4d68b 100644
--- a/.test-infra/jenkins/common_job_properties.groovy
+++ b/.test-infra/jenkins/common_job_properties.groovy
@@ -23,12 +23,11 @@
class common_job_properties {
// Sets common top-level job properties for website repository jobs.
- static void setTopLevelWebsiteJobProperties(context,
- String branch = 'asf-site') {
+ static void setTopLevelWebsiteJobProperties(context) {
setTopLevelJobProperties(
context,
'beam-site',
- branch,
+ 'asf-site',
'beam',
30)
}
@@ -265,10 +264,8 @@ class common_job_properties {
shell('rm -rf PerfKitBenchmarker')
// Clone appropriate perfkit branch
shell('git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git')
- // Install Perfkit benchmark requirements.
+ // Install job requirements.
shell('pip install --user -r PerfKitBenchmarker/requirements.txt')
- // Install job requirements for Python SDK.
- shell('pip install --user -e sdks/python/[gcp,test]')
// Launch performance test.
shell("python PerfKitBenchmarker/pkb.py $pkbArgs")
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy
deleted file mode 100644
index 6a71bda..0000000
--- a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import common_job_properties
-
-// This job runs the Beam Python performance tests on PerfKit Benchmarker.
-job('beam_PerformanceTests_Python'){
- // Set default Beam job properties.
- common_job_properties.setTopLevelMainJobProperties(delegate)
-
- // Run job in postcommit every 6 hours, don't trigger every push.
- common_job_properties.setPostCommit(
- delegate,
- '0 */6 * * *',
- false,
- 'commits@beam.apache.org')
-
- // Allows triggering this build against pull requests.
- common_job_properties.enablePhraseTriggeringFromPullRequest(
- delegate,
- 'Python SDK Performance Test',
- 'Run Python Performance Test')
-
- def pipelineArgs = [
- project: 'apache-beam-testing',
- staging_location: 'gs://temp-storage-for-end-to-end-tests/staging-it',
- temp_location: 'gs://temp-storage-for-end-to-end-tests/temp-it',
- output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output'
- ]
- def pipelineArgList = []
- pipelineArgs.each({
- key, value -> pipelineArgList.add("--$key=$value")
- })
- def pipelineArgsJoined = pipelineArgList.join(',')
-
- def argMap = [
- beam_sdk : 'python',
- benchmarks: 'beam_integration_benchmark',
- beam_it_args: pipelineArgsJoined
- ]
-
- common_job_properties.buildPerformanceTest(delegate, argMap)
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
index df0a2c7..f23e741 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
@@ -37,8 +37,6 @@ matrixJob('beam_PostCommit_Java_JDK_Versions_Test') {
common_job_properties.setPostCommit(
delegate,
'0 */6 * * *',
- false,
- '', // TODO: Remove last two args once test is stable again.
false)
// Allows triggering this build against pull requests.
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
index 6ef272c..f781b4e 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
@@ -32,8 +32,7 @@ mavenJob('beam_PostCommit_Java_MavenInstall_Windows') {
common_job_properties.setMavenConfig(delegate, 'Maven 3.3.3 (Windows)')
// Sets that this is a PostCommit job.
- // TODO(BEAM-1042, BEAM-1045, BEAM-2269, BEAM-2299) Turn notifications back on once fixed.
- common_job_properties.setPostCommit(delegate, '0 */6 * * *', false, '', false)
+ common_job_properties.setPostCommit(delegate, '0 */6 * * *', false)
// Allows triggering this build against pull requests.
common_job_properties.enablePhraseTriggeringFromPullRequest(
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy b/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
deleted file mode 100644
index 0e2ae3f..0000000
--- a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import common_job_properties
-
-// Defines a job.
-job('beam_PreCommit_Website_Merge') {
- description('Runs website tests for mergebot.')
-
- // Set common parameters.
- common_job_properties.setTopLevelWebsiteJobProperties(delegate, 'mergebot')
-
- triggers {
- githubPush()
- }
-
- steps {
- // Run the following shell script as a build step.
- shell '''
- # Install RVM per instructions at https://rvm.io/rvm/install.
- RVM_GPG_KEY=409B6B1796C275462A1703113804BB82D39DC0E3
- gpg --keyserver hkp://keys.gnupg.net --recv-keys $RVM_GPG_KEY
-
- \\curl -sSL https://get.rvm.io | bash
- source /home/jenkins/.rvm/scripts/rvm
-
- # Install Ruby.
- RUBY_VERSION_NUM=2.3.0
- rvm install ruby $RUBY_VERSION_NUM --autolibs=read-only
-
- # Install Bundler gem
- PATH=~/.gem/ruby/$RUBY_VERSION_NUM/bin:$PATH
- GEM_PATH=~/.gem/ruby/$RUBY_VERSION_NUM/:$GEM_PATH
- gem install bundler --user-install
-
- # Install all needed gems.
- bundle install --path ~/.gem/
-
- # Build the new site and test it.
- rm -fr ./content/
- bundle exec rake test
- '''.stripIndent().trim()
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index ae64a79..701e4fe 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -34,6 +34,10 @@
<packaging>jar</packaging>
+ <properties>
+ <spark.version>1.6.2</spark.version>
+ </properties>
+
<profiles>
<!--
@@ -62,12 +66,6 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-apex</artifactId>
<scope>runtime</scope>
- <exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!--
Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
@@ -97,12 +95,6 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<scope>runtime</scope>
- <exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
@@ -124,11 +116,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 49865ba..5e6df9c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,12 +17,11 @@
*/
package org.apache.beam.examples.common;
-import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Verify.verifyNotNull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -54,12 +53,22 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
@Override
public PDone expand(PCollection<String> input) {
+ // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+ // component from that path for the PerWindowFiles.
+ String prefix = "";
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
- TextIO.Write write =
- TextIO.write()
- .to(new PerWindowFiles(resource))
- .withTempDirectory(resource.getCurrentDirectory())
- .withWindowedWrites();
+ if (!resource.isDirectory()) {
+ prefix = verifyNotNull(
+ resource.getFilename(),
+ "A non-directory resource should have a non-null filename: %s",
+ resource);
+ }
+
+
+ TextIO.Write write = TextIO.write()
+ .to(resource.getCurrentDirectory())
+ .withFilenamePolicy(new PerWindowFiles(prefix))
+ .withWindowedWrites();
if (numShards != null) {
write = write.withNumShards(numShards);
}
@@ -74,36 +83,31 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
*/
public static class PerWindowFiles extends FilenamePolicy {
- private final ResourceId baseFilename;
+ private final String prefix;
- public PerWindowFiles(ResourceId baseFilename) {
- this.baseFilename = baseFilename;
+ public PerWindowFiles(String prefix) {
+ this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
- String prefix =
- baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
return String.format("%s-%s-%s",
prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}
@Override
- public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
+ public ResourceId windowedFilename(
+ ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
- String filename =
- String.format(
- "%s-%s-of-%s%s",
- filenamePrefixForWindow(window),
- context.getShardNumber(),
- context.getNumShards(),
- outputFileHints.getSuggestedFilenameSuffix());
- return baseFilename
- .getCurrentDirectory()
- .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ String filename = String.format(
+ "%s-%s-of-%s%s",
+ filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+ extension);
+ return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
- public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index bec7952..eb7e4c4 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -150,8 +149,7 @@ public class WindowedWordCountIT {
String outputPrefix = options.getOutput();
- PerWindowFiles filenamePolicy =
- new PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
+ PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix);
List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 6fd29a4..56295a4 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -35,6 +35,10 @@
<packaging>jar</packaging>
+ <properties>
+ <spark.version>1.6.2</spark.version>
+ </properties>
+
<profiles>
<!--
The direct runner is available by default.
@@ -62,12 +66,6 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-apex</artifactId>
<scope>runtime</scope>
- <exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!--
Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
@@ -97,12 +95,6 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<scope>runtime</scope>
- <exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
@@ -124,11 +116,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index 1d60198..e6c8ddb 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -18,6 +18,7 @@
package org.apache.beam.examples.complete.game.utils;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
import java.io.Serializable;
import java.util.ArrayList;
@@ -27,7 +28,6 @@ import java.util.TimeZone;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -111,12 +111,21 @@ public class WriteToText<InputT>
checkArgument(
input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
+ // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+ // component from that path for the PerWindowFiles.
+ String prefix = "";
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+ if (!resource.isDirectory()) {
+ prefix = verifyNotNull(
+ resource.getFilename(),
+ "A non-directory resource should have a non-null filename: %s",
+ resource);
+ }
return input.apply(
TextIO.write()
- .to(new PerWindowFiles(resource))
- .withTempDirectory(resource.getCurrentDirectory())
+ .to(resource.getCurrentDirectory())
+ .withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites()
.withNumShards(3));
}
@@ -130,33 +139,31 @@ public class WriteToText<InputT>
*/
protected static class PerWindowFiles extends FilenamePolicy {
- private final ResourceId prefix;
+ private final String prefix;
- public PerWindowFiles(ResourceId prefix) {
+ public PerWindowFiles(String prefix) {
this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
- String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
- return String.format(
- "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
+ return String.format("%s-%s-%s",
+ prefix, formatter.print(window.start()), formatter.print(window.end()));
}
@Override
- public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
+ public ResourceId windowedFilename(
+ ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
- String filename =
- String.format(
- "%s-%s-of-%s%s",
- filenamePrefixForWindow(window),
- context.getShardNumber(),
- context.getNumShards(),
- outputFileHints.getSuggestedFilenameSuffix());
- return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ String filename = String.format(
+ "%s-%s-of-%s%s",
+ filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+ extension);
+ return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
- public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 611e2b3..745c210 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -276,8 +276,6 @@ public class LeaderBoardTest implements Serializable {
.addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
event(TestUser.BLUE_TWO, 3, Duration.ZERO),
event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
- // Move the watermark to the end of the window to output on time
- .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION))
// Move the watermark past the end of the allowed lateness plus the end of the window
.advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
.plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 51f4c35..a7e61dd 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 987760f..1d8d4b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<url>http://beam.apache.org/</url>
<inceptionYear>2016</inceptionYear>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<licenses>
<license>
@@ -101,14 +101,13 @@
<beamSurefireArgline />
<!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
- <apache.commons.compress.version>1.14</apache.commons.compress.version>
- <apache.commons.lang.version>3.6</apache.commons.lang.version>
- <apache.commons.text.version>1.1</apache.commons.text.version>
+ <apache.commons.lang.version>3.5</apache.commons.lang.version>
+ <apache.commons.compress.version>1.9</apache.commons.compress.version>
<apex.kryo.version>2.24.0</apex.kryo.version>
<api-common.version>1.0.0-rc2</api-common.version>
<avro.version>1.8.2</avro.version>
<bigquery.version>v2-rev295-1.22.0</bigquery.version>
- <bigtable.version>0.9.7.1</bigtable.version>
+ <bigtable.version>0.9.6.2</bigtable.version>
<cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
<pubsubgrpc.version>0.1.0</pubsubgrpc.version>
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
@@ -127,14 +126,8 @@
<guava.version>20.0</guava.version>
<grpc.version>1.2.0</grpc.version>
<grpc-google-common-protos.version>0.1.9</grpc-google-common-protos.version>
- <!--
- This is the version of Hadoop used to compile the module that depend on Hadoop.
- This dependency is defined with a provided scope.
- Users must supply their own Hadoop version at runtime.
- -->
- <hadoop.version>2.7.3</hadoop.version>
<hamcrest.version>1.3</hamcrest.version>
- <jackson.version>2.8.9</jackson.version>
+ <jackson.version>2.8.8</jackson.version>
<findbugs.version>3.0.1</findbugs.version>
<joda.version>2.4</joda.version>
<junit.version>4.12</junit.version>
@@ -144,8 +137,8 @@
<protobuf.version>3.2.0</protobuf.version>
<pubsub.version>v1-rev10-1.22.0</pubsub.version>
<slf4j.version>1.7.14</slf4j.version>
- <spanner.version>0.20.0-beta</spanner.version>
- <spark.version>1.6.3</spark.version>
+ <spanner.version>0.16.0-beta</spanner.version>
+ <spark.version>1.6.2</spark.version>
<spring.version>4.3.5.RELEASE</spring.version>
<stax2.version>3.1.4</stax2.version>
<storage.version>v1-rev71-1.22.0</storage.version>
@@ -159,7 +152,7 @@
<failsafe-plugin.version>2.20</failsafe-plugin.version>
<maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
<maven-dependency-plugin.version>3.0.1</maven-dependency-plugin.version>
- <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
+ <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
<maven-resources-plugin.version>3.0.2</maven-resources-plugin.version>
<maven-shade-plugin.version>3.0.0</maven-shade-plugin.version>
@@ -167,7 +160,6 @@
<compiler.error.flag>-Werror</compiler.error.flag>
<compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag>
<compiler.default.exclude>nothing</compiler.default.exclude>
- <gax-grpc.version>0.20.0</gax-grpc.version>
</properties>
<packaging>pom</packaging>
@@ -429,18 +421,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-amqp</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-cassandra</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
@@ -478,12 +458,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-hcatalog</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
@@ -538,13 +512,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-java</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${project.version}</version>
</dependency>
@@ -598,12 +565,6 @@
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-text</artifactId>
- <version>${apache.commons.text.version}</version>
- </dependency>
-
- <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
@@ -664,12 +625,6 @@
</dependency>
<dependency>
- <groupId>com.google.api</groupId>
- <artifactId>gax-grpc</artifactId>
- <version>${gax-grpc.version}</version>
- </dependency>
-
- <dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>${google-clients.version}</version>
@@ -884,11 +839,6 @@
</dependency>
<dependency>
- <groupId>com.google.cloud</groupId>
- <artifactId>google-cloud-core-grpc</artifactId>
- <version>${grpc.version}</version>
- </dependency>
- <dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-protos</artifactId>
<version>${bigtable.version}</version>
@@ -1100,42 +1050,6 @@
<version>${snappy-java.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-network-common_2.10</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
<!-- Testing -->
<dependency>
@@ -1205,27 +1119,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index fd5aafb..4a36bec 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -75,13 +75,6 @@
<artifactId>apex-engine</artifactId>
<version>${apex.core.version}</version>
<scope>runtime</scope>
- <exclusions>
- <!-- Fix build on JDK-9 -->
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!-- Beam -->
@@ -191,13 +184,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-java</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
@@ -263,12 +249,12 @@
<configuration>
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:${apex.core.version}</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::${apache.commons.lang.version}</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.4</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::${apex.kryo.version}</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.3.0</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:${hadoop.version}</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:20.0</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index fd0a1c9..c595b3f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -62,6 +62,8 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.AsIterable;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -212,7 +214,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
* @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
*/
public static class CreateApexPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
private static final long serialVersionUID = 1L;
private PCollectionView<ViewT> view;
@@ -226,13 +228,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
- }
-
- public PCollectionView<ViewT> getView() {
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
return view;
}
}
@@ -245,7 +241,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
private static class StreamingWrapSingletonInList<T>
- extends PTransform<PCollection<T>, PCollection<T>> {
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
private static final long serialVersionUID = 1L;
CreatePCollectionView<T, T> transform;
@@ -258,11 +254,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public PCollection<T> expand(PCollection<T> input) {
- input
+ public PCollectionView<T> expand(PCollection<T> input) {
+ return input
.apply(ParDo.of(new WrapAsList<T>()))
- .apply(CreateApexPCollectionView.<List<T>, T>of(transform.getView()));
- return input;
+ .apply(CreateApexPCollectionView.<T, T>of(transform.getView()));
}
@Override
@@ -272,12 +267,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
static class Factory<T>
extends SingleInputOutputOverrideFactory<
- PCollection<T>, PCollection<T>,
+ PCollection<T>, PCollectionView<T>,
CreatePCollectionView<T, T>> {
@Override
- public PTransformReplacement<PCollection<T>, PCollection<T>> getReplacementTransform(
- AppliedPTransform<PCollection<T>, PCollection<T>, CreatePCollectionView<T, T>>
- transform) {
+ public PTransformReplacement<PCollection<T>, PCollectionView<T>>
+ getReplacementTransform(
+ AppliedPTransform<
+ PCollection<T>, PCollectionView<T>,
+ CreatePCollectionView<T, T>>
+ transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
new StreamingWrapSingletonInList<>(transform.getTransform()));
@@ -286,19 +284,18 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
private static class StreamingViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollection<T>> {
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
private static final long serialVersionUID = 1L;
- private final PCollectionView<Iterable<T>> view;
- private StreamingViewAsIterable(PCollectionView<Iterable<T>> view) {
- this.view = view;
- }
+ private StreamingViewAsIterable() {}
@Override
- public PCollection<T> expand(PCollection<T> input) {
- return ((PCollection<T>)
- input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()))
- .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view));
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view));
}
@Override
@@ -308,17 +305,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
static class Factory<T>
extends SingleInputOutputOverrideFactory<
- PCollection<T>, PCollection<T>, CreatePCollectionView<T, Iterable<T>>> {
+ PCollection<T>, PCollectionView<Iterable<T>>, View.AsIterable<T>> {
@Override
- public PTransformReplacement<PCollection<T>, PCollection<T>>
+ public PTransformReplacement<PCollection<T>, PCollectionView<Iterable<T>>>
getReplacementTransform(
- AppliedPTransform<
- PCollection<T>, PCollection<T>,
- CreatePCollectionView<T, Iterable<T>>>
+ AppliedPTransform<PCollection<T>, PCollectionView<Iterable<T>>, AsIterable<T>>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new StreamingViewAsIterable<T>(transform.getTransform().getView()));
+ new StreamingViewAsIterable<T>());
}
}
}
@@ -381,7 +376,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
transform) {
return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
- SplittableParDo.forJavaParDo(transform.getTransform()));
+ new SplittableParDo<>(transform.getTransform()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 02f53ec..bda074b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,6 +154,7 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
unboundedSource, true, context.getPipelineOptions());
context.addOperator(operator, operator.output);
}
+
}
private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
@@ -160,10 +162,11 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
private static final long serialVersionUID = 1L;
@Override
- public void translate(
- CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context) {
- context.addView(transform.getView());
- LOG.debug("view {}", transform.getView().getName());
+ public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
+ TranslationContext context) {
+ PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
+ context.addView(view);
+ LOG.debug("view {}", view.getName());
}
}
@@ -174,8 +177,9 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
@Override
public void translate(
CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
- context.addView(transform.getView());
- LOG.debug("view {}", transform.getView().getName());
+ PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
+ context.addView(view);
+ LOG.debug("view {}", view.getName());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 94d13e1..aff3863 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -34,7 +34,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
-import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -94,8 +93,7 @@ class TranslationContext {
}
public <InputT extends PValue> InputT getInput() {
- return (InputT)
- Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
+ return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values());
}
public Map<TupleTag<?>, PValue> getOutputs() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index c3cbab2..809ca2a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -359,7 +359,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
}
if (sideInputs.isEmpty()) {
- outputWatermark(mark);
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", mark);
+ }
+ output.emit(mark);
return;
}
@@ -367,20 +370,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
Math.min(pushedBackWatermark.get(), currentInputWatermark);
if (potentialOutputWatermark > currentOutputWatermark) {
currentOutputWatermark = potentialOutputWatermark;
- outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
- }
- }
-
- private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
- if (traceTuples) {
- LOG.debug("\nemitting {}\n", mark);
- }
- output.emit(mark);
- if (!additionalOutputPortMapping.isEmpty()) {
- for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
- additionalOutputPortMapping.values()) {
- additionalOutput.emit(mark);
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
}
+ output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index ba75746..e76096e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -123,15 +123,11 @@ public class WordCountTest {
options.setInputFile(new File(inputFile).getAbsolutePath());
String outputFilePrefix = "target/wordcountresult.txt";
options.setOutput(outputFilePrefix);
+ WordCountTest.main(TestPipeline.convertToArgs(options));
File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
- Assert.assertTrue(!outFile1.exists() || outFile1.delete());
- Assert.assertTrue(!outFile2.exists() || outFile2.delete());
-
- WordCountTest.main(TestPipeline.convertToArgs(options));
-
- Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists());
+ Assert.assertTrue(outFile1.exists() && outFile2.exists());
HashSet<String> results = new HashSet<>();
results.addAll(FileUtils.readLines(outFile1));
results.addAll(FileUtils.readLines(outFile2));