You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:54 UTC

[27/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

Revert "[BEAM-2610] This closes #3553"

This reverts commit ec494f675aa73fbdc7929f9592f33951941962b0, reversing
changes made to d89d1ee1a3085269cdf44ec50e29a95c8f43757b.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1b2b96a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1b2b96a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1b2b96a

Branch: refs/heads/DSL_SQL
Commit: c1b2b96a438b86a8b0023f6943dcf0a4f238ba39
Parents: 97a156c
Author: mingmxu <mi...@ebay.com>
Authored: Wed Jul 19 14:26:11 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Wed Jul 19 14:26:11 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 .../jenkins/common_job_properties.groovy        |   9 +-
 .../job_beam_PerformanceTests_Python.groovy     |  58 --
 ..._beam_PostCommit_Java_JDKVersionsTest.groovy |   2 -
 ..._PostCommit_Java_MavenInstall_Windows.groovy |   3 +-
 .../job_beam_PreCommit_Website_Merge.groovy     |  59 --
 examples/java/pom.xml                           |  20 +-
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../beam/examples/WindowedWordCountIT.java      |   4 +-
 examples/java8/pom.xml                          |  20 +-
 .../complete/game/utils/WriteToText.java        |  43 +-
 .../examples/complete/game/LeaderBoardTest.java |   2 -
 examples/pom.xml                                |   2 +-
 pom.xml                                         | 123 +---
 runners/apex/pom.xml                            |  20 +-
 .../apache/beam/runners/apex/ApexRunner.java    |  61 +-
 .../translation/ApexPipelineTranslator.java     |  16 +-
 .../apex/translation/TranslationContext.java    |   4 +-
 .../operators/ApexParDoOperator.java            |  21 +-
 .../runners/apex/examples/WordCountTest.java    |   8 +-
 .../utils/ApexStateInternalsTest.java           | 411 ++++++++---
 runners/core-construction-java/pom.xml          |   2 +-
 .../CreatePCollectionViewTranslation.java       |   4 +-
 .../construction/ElementAndRestriction.java     |  42 ++
 .../ElementAndRestrictionCoder.java             |  88 +++
 .../construction/PCollectionTranslation.java    |  16 -
 .../core/construction/PTransformMatchers.java   | 109 +--
 .../construction/PTransformTranslation.java     |   7 +-
 .../core/construction/ParDoTranslation.java     |  82 +--
 .../construction/RunnerPCollectionView.java     |  31 +-
 .../core/construction/SplittableParDo.java      | 124 +---
 .../construction/TestStreamTranslation.java     |  49 +-
 .../core/construction/TransformInputs.java      |  50 --
 .../WindowingStrategyTranslation.java           |  27 +-
 .../construction/WriteFilesTranslation.java     |  67 +-
 .../ElementAndRestrictionCoderTest.java         | 126 ++++
 .../PCollectionTranslationTest.java             |  22 -
 .../construction/PTransformMatchersTest.java    |  54 +-
 .../core/construction/ParDoTranslationTest.java |  28 +-
 .../core/construction/SplittableParDoTest.java  |  18 +-
 .../core/construction/TransformInputsTest.java  | 166 -----
 .../WindowingStrategyTranslationTest.java       |   3 -
 .../construction/WriteFilesTranslationTest.java |  62 +-
 runners/core-java/pom.xml                       |   2 +-
 .../core/LateDataDroppingDoFnRunner.java        |  33 +-
 ...eBoundedSplittableProcessElementInvoker.java |  40 +-
 .../beam/runners/core/ProcessFnRunner.java      |  16 +-
 .../beam/runners/core/ReduceFnRunner.java       | 135 ++--
 .../beam/runners/core/SimpleDoFnRunner.java     |  20 -
 .../core/SplittableParDoViaKeyedWorkItems.java  |  58 +-
 .../core/SplittableProcessElementInvoker.java   |  25 +-
 .../beam/runners/core/SystemReduceFn.java       |   6 -
 .../core/triggers/AfterAllStateMachine.java     |  25 +-
 .../AfterDelayFromFirstElementStateMachine.java |   6 +-
 .../core/triggers/AfterFirstStateMachine.java   |  20 +-
 .../core/triggers/AfterPaneStateMachine.java    |   6 +-
 .../triggers/AfterWatermarkStateMachine.java    |   7 +-
 .../triggers/ExecutableTriggerStateMachine.java |  23 +-
 .../core/triggers/NeverStateMachine.java        |   5 +-
 .../core/triggers/TriggerStateMachine.java      |  27 +
 .../core/InMemoryStateInternalsTest.java        | 569 +++++++++++++--
 ...ndedSplittableProcessElementInvokerTest.java |  47 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 374 +---------
 .../beam/runners/core/ReduceFnTester.java       |  48 +-
 .../core/SplittableParDoProcessFnTest.java      | 117 +--
 .../beam/runners/core/StateInternalsTest.java   | 613 ----------------
 .../beam/runners/core/WindowMatchers.java       |  15 -
 .../triggers/AfterFirstStateMachineTest.java    |   5 +-
 .../AfterWatermarkStateMachineTest.java         |   7 +-
 .../core/triggers/StubTriggerStateMachine.java  |   7 +-
 runners/direct-java/pom.xml                     |   7 +-
 .../beam/runners/direct/CommittedResult.java    |  12 +-
 .../apache/beam/runners/direct/DirectGraph.java |  38 +-
 .../beam/runners/direct/DirectGraphVisitor.java |  48 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  13 +-
 .../direct/DirectGroupByKeyOverrideFactory.java |  14 +-
 .../beam/runners/direct/DirectRegistrar.java    |   2 +-
 .../beam/runners/direct/DirectRunner.java       |  64 +-
 .../beam/runners/direct/DirectTestOptions.java  |  42 --
 .../beam/runners/direct/EvaluationContext.java  |  26 +-
 .../direct/ExecutorServiceParallelExecutor.java |  27 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |   9 +-
 .../direct/ParDoMultiOverrideFactory.java       | 121 +---
 ...littableProcessElementsEvaluatorFactory.java |  37 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |  12 +-
 .../direct/TestStreamEvaluatorFactory.java      |  20 +-
 .../runners/direct/ViewEvaluatorFactory.java    |   8 +-
 .../runners/direct/ViewOverrideFactory.java     |  69 +-
 .../beam/runners/direct/WatermarkManager.java   |  18 +-
 .../direct/WriteWithShardingFactory.java        |  34 +-
 .../runners/direct/CommittedResultTest.java     |  17 +-
 .../runners/direct/DirectGraphVisitorTest.java  |  10 +-
 .../beam/runners/direct/DirectGraphs.java       |   7 -
 .../runners/direct/DirectRegistrarTest.java     |   2 +-
 .../runners/direct/EvaluationContextTest.java   |   7 +-
 .../ImmutabilityEnforcementFactoryTest.java     |   4 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   7 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |  65 +-
 .../runners/direct/TransformExecutorTest.java   |  12 +-
 .../direct/ViewEvaluatorFactoryTest.java        |   8 +-
 .../runners/direct/ViewOverrideFactoryTest.java |  37 +-
 .../direct/WatermarkCallbackExecutorTest.java   |   1 -
 .../runners/direct/WatermarkManagerTest.java    |  16 +-
 .../direct/WriteWithShardingFactoryTest.java    |  44 +-
 runners/flink/pom.xml                           |  11 +-
 .../runners/flink/CreateStreamingFlinkView.java | 154 ----
 .../flink/FlinkBatchTranslationContext.java     |   3 +-
 .../FlinkPipelineExecutionEnvironment.java      |   2 -
 .../flink/FlinkStreamingPipelineTranslator.java |  86 ++-
 .../FlinkStreamingTransformTranslators.java     |  36 +-
 .../flink/FlinkStreamingTranslationContext.java |   3 +-
 .../flink/FlinkStreamingViewOverrides.java      | 372 ++++++++++
 .../runners/flink/FlinkTransformOverrides.java  |  53 --
 .../streaming/SplittableDoFnOperator.java       |  16 +-
 .../streaming/state/FlinkStateInternals.java    | 425 ++++++-----
 .../FlinkBroadcastStateInternalsTest.java       | 242 +++++--
 .../FlinkKeyGroupStateInternalsTest.java        | 359 +++++-----
 .../streaming/FlinkSplitStateInternalsTest.java | 132 ++--
 .../streaming/FlinkStateInternalsTest.java      | 343 ++++++++-
 runners/google-cloud-dataflow-java/pom.xml      |  10 +-
 .../dataflow/BatchStatefulParDoOverrides.java   |   4 -
 .../runners/dataflow/BatchViewOverrides.java    | 182 +++--
 .../runners/dataflow/CreateDataflowView.java    |   8 +-
 .../dataflow/DataflowPipelineTranslator.java    |  62 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 133 +---
 .../dataflow/SplittableParDoOverrides.java      |  76 --
 .../dataflow/StreamingViewOverrides.java        |  10 +-
 .../runners/dataflow/TransformTranslator.java   |   4 +-
 .../runners/dataflow/util/PropertyNames.java    |   1 -
 .../beam/runners/dataflow/util/TimeUtil.java    |  24 +-
 .../DataflowPipelineTranslatorTest.java         |  95 +--
 .../runners/dataflow/DataflowRunnerTest.java    | 198 +-----
 .../runners/dataflow/util/TimeUtilTest.java     |   6 -
 runners/pom.xml                                 |   2 +-
 runners/spark/pom.xml                           |  70 +-
 .../spark/SparkNativePipelineVisitor.java       |   3 +-
 .../apache/beam/runners/spark/SparkRunner.java  |   9 +-
 .../beam/runners/spark/TestSparkRunner.java     |   2 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   6 +-
 .../spark/stateful/SparkTimerInternals.java     |  18 +-
 .../spark/translation/EvaluationContext.java    |   4 +-
 .../spark/translation/TransformTranslator.java  |  50 +-
 .../spark/util/GlobalWatermarkHolder.java       | 127 +---
 .../spark/GlobalWatermarkHolderTest.java        |  18 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  26 +-
 .../spark/stateful/SparkStateInternalsTest.java |  66 --
 .../spark/translation/StorageLevelTest.java     |   4 +-
 sdks/common/fn-api/pom.xml                      |   2 +-
 .../fn-api/src/main/proto/beam_fn_api.proto     | 237 ++++--
 sdks/common/pom.xml                             |   2 +-
 sdks/common/runner-api/pom.xml                  |   2 +-
 .../src/main/proto/beam_runner_api.proto        |  26 +-
 sdks/java/build-tools/pom.xml                   |   2 +-
 .../src/main/resources/beam/findbugs-filter.xml |   9 -
 sdks/java/core/pom.xml                          |   2 +-
 .../apache/beam/sdk/coders/ShardedKeyCoder.java |  66 --
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 220 +++---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  32 +-
 .../apache/beam/sdk/io/CompressedSource.java    |  40 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 274 ++-----
 .../beam/sdk/io/DynamicFileDestinations.java    | 115 ---
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 513 ++++++-------
 .../apache/beam/sdk/io/OffsetBasedSource.java   |  22 +-
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  44 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 712 ++++---------------
 .../java/org/apache/beam/sdk/io/TextSink.java   |  22 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 647 ++++++-----------
 .../beam/sdk/io/range/ByteKeyRangeTracker.java  |  22 +-
 .../apache/beam/sdk/io/range/OffsetRange.java   | 101 ---
 .../beam/sdk/io/range/OffsetRangeTracker.java   |   3 -
 .../sdk/options/PipelineOptionsFactory.java     |  18 +-
 .../sdk/options/PipelineOptionsValidator.java   |  34 +-
 .../sdk/options/ProxyInvocationHandler.java     |  19 +-
 .../beam/sdk/runners/TransformHierarchy.java    | 165 +----
 .../apache/beam/sdk/testing/StaticWindows.java  |   5 -
 .../org/apache/beam/sdk/testing/TestStream.java |  12 -
 .../org/apache/beam/sdk/transforms/Combine.java |  30 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |  52 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  21 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  41 +-
 .../sdk/transforms/SerializableFunctions.java   |  50 --
 .../org/apache/beam/sdk/transforms/View.java    |  38 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  27 -
 .../reflect/ByteBuddyOnTimerInvokerFactory.java |  73 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     |  17 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  33 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  44 +-
 .../reflect/OnTimerMethodSpecifier.java         |  37 -
 .../transforms/splittabledofn/OffsetRange.java  |  77 ++
 .../splittabledofn/OffsetRangeTracker.java      |  11 -
 .../splittabledofn/RestrictionTracker.java      |  11 +-
 .../sdk/transforms/windowing/GlobalWindows.java |   5 -
 .../windowing/PartitioningWindowFn.java         |   5 -
 .../transforms/windowing/SlidingWindows.java    |   5 -
 .../beam/sdk/transforms/windowing/Window.java   |  32 -
 .../beam/sdk/transforms/windowing/WindowFn.java |  11 -
 .../apache/beam/sdk/util/IdentityWindowFn.java  |   5 -
 .../org/apache/beam/sdk/values/PCollection.java |  12 -
 .../beam/sdk/values/PCollectionViews.java       |  38 -
 .../org/apache/beam/sdk/values/PValueBase.java  |  12 +
 .../org/apache/beam/sdk/values/ShardedKey.java  |  65 --
 .../beam/sdk/values/WindowingStrategy.java      |  46 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  85 +--
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  | 135 ++--
 .../sdk/io/DrunkWritableByteChannelFactory.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  93 +--
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  56 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 326 +--------
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 366 ++--------
 .../options/PipelineOptionsValidatorTest.java   |  44 --
 .../sdk/options/ProxyInvocationHandlerTest.java |  19 -
 .../sdk/runners/TransformHierarchyTest.java     | 197 -----
 .../sdk/testing/PCollectionViewTesting.java     |   8 -
 .../apache/beam/sdk/transforms/CombineTest.java | 365 ++++------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  32 -
 .../beam/sdk/transforms/GroupByKeyTest.java     |  39 -
 .../apache/beam/sdk/transforms/ParDoTest.java   | 165 -----
 .../beam/sdk/transforms/SplittableDoFnTest.java | 155 +---
 .../transforms/reflect/DoFnInvokersTest.java    |  93 +--
 .../DoFnSignaturesProcessElementTest.java       |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java       |  83 +--
 .../transforms/reflect/DoFnSignaturesTest.java  |  14 -
 .../splittabledofn/OffsetRangeTrackerTest.java  |   1 -
 .../windowing/SlidingWindowsTest.java           |  30 +-
 .../google-cloud-platform-core/pom.xml          |   2 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |   2 +-
 .../sdk/util/RetryHttpRequestInitializer.java   | 147 ++--
 .../extensions/gcp/GcpCoreApiSurfaceTest.java   |  48 +-
 .../util/RetryHttpRequestInitializerTest.java   |  31 +-
 sdks/java/extensions/jackson/pom.xml            |   2 +-
 sdks/java/extensions/join-library/pom.xml       |   2 +-
 sdks/java/extensions/pom.xml                    |   2 +-
 sdks/java/extensions/protobuf/pom.xml           |   2 +-
 sdks/java/extensions/sorter/pom.xml             |   8 +-
 sdks/java/harness/pom.xml                       |  18 +-
 .../harness/control/ProcessBundleHandler.java   | 295 ++++++--
 .../fn/harness/control/RegisterHandler.java     |   2 +-
 .../beam/runners/core/BeamFnDataReadRunner.java |  70 +-
 .../runners/core/BeamFnDataWriteRunner.java     |  67 +-
 .../beam/runners/core/BoundedSourceRunner.java  |  74 +-
 .../beam/runners/core/FnApiDoFnRunner.java      | 547 --------------
 .../runners/core/PTransformRunnerFactory.java   |  81 ---
 .../control/ProcessBundleHandlerTest.java       | 521 ++++++++++++--
 .../fn/harness/control/RegisterHandlerTest.java |   8 +-
 .../runners/core/BeamFnDataReadRunnerTest.java  | 112 +--
 .../runners/core/BeamFnDataWriteRunnerTest.java | 120 +---
 .../runners/core/BoundedSourceRunnerTest.java   | 124 +---
 .../beam/runners/core/FnApiDoFnRunnerTest.java  | 210 ------
 sdks/java/io/amqp/pom.xml                       | 100 ---
 .../org/apache/beam/sdk/io/amqp/AmqpIO.java     | 399 -----------
 .../beam/sdk/io/amqp/AmqpMessageCoder.java      |  79 --
 .../amqp/AmqpMessageCoderProviderRegistrar.java |  44 --
 .../apache/beam/sdk/io/amqp/package-info.java   |  22 -
 .../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 148 ----
 .../beam/sdk/io/amqp/AmqpMessageCoderTest.java  |  89 ---
 sdks/java/io/cassandra/pom.xml                  |   2 +-
 .../beam/sdk/io/cassandra/CassandraIO.java      |   2 +-
 sdks/java/io/common/pom.xml                     |   2 +-
 .../sdk/io/common/IOTestPipelineOptions.java    |   6 +-
 sdks/java/io/elasticsearch/pom.xml              |  10 +-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   |  17 +-
 .../elasticsearch/ElasticSearchIOTestUtils.java |  81 +--
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java |  14 +-
 .../io/elasticsearch/ElasticsearchIOTest.java   |  36 +-
 .../elasticsearch/ElasticsearchTestDataSet.java |  37 +-
 sdks/java/io/google-cloud-platform/pom.xml      |  14 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |   2 -
 .../io/gcp/bigquery/DynamicDestinations.java    |  29 +-
 .../io/gcp/bigquery/GenerateShardedTable.java   |   1 -
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java    |  67 ++
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    |  74 ++
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   1 -
 .../io/gcp/bigquery/StreamingWriteTables.java   |   2 -
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |   1 -
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |   2 -
 .../bigquery/WriteGroupedRecordsToFiles.java    |   1 -
 .../sdk/io/gcp/bigquery/WritePartition.java     |   1 -
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   1 -
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |   8 +-
 .../io/gcp/bigtable/BigtableServiceImpl.java    |   9 +-
 .../sdk/io/gcp/datastore/AdaptiveThrottler.java | 103 ---
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 149 +---
 .../sdk/io/gcp/datastore/MovingAverage.java     |  50 --
 .../sdk/io/gcp/spanner/AbstractSpannerFn.java   |  58 --
 .../sdk/io/gcp/spanner/CreateTransactionFn.java |  51 --
 .../beam/sdk/io/gcp/spanner/MutationGroup.java  |  67 --
 .../io/gcp/spanner/MutationSizeEstimator.java   |   9 -
 .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java  |  65 --
 .../beam/sdk/io/gcp/spanner/SpannerConfig.java  | 137 ----
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 616 +++++-----------
 .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 125 ----
 .../beam/sdk/io/gcp/spanner/Transaction.java    |  33 -
 .../beam/sdk/io/gcp/GcpApiSurfaceTest.java      |  10 -
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   2 -
 .../sdk/io/gcp/bigtable/BigtableReadIT.java     |   5 +-
 .../io/gcp/bigtable/BigtableTestOptions.java    |   5 +
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    |   4 +-
 .../io/gcp/datastore/AdaptiveThrottlerTest.java | 111 ---
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  92 +--
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   |   2 +-
 .../sdk/io/gcp/spanner/FakeServiceFactory.java  |  82 ---
 .../gcp/spanner/MutationSizeEstimatorTest.java  |  12 -
 .../beam/sdk/io/gcp/spanner/RandomUtils.java    |  41 --
 .../sdk/io/gcp/spanner/SpannerIOReadTest.java   | 281 --------
 .../beam/sdk/io/gcp/spanner/SpannerIOTest.java  | 244 +++++++
 .../sdk/io/gcp/spanner/SpannerIOWriteTest.java  | 258 -------
 .../beam/sdk/io/gcp/spanner/SpannerReadIT.java  | 166 -----
 .../beam/sdk/io/gcp/spanner/SpannerWriteIT.java |  26 +-
 sdks/java/io/hadoop-common/pom.xml              |   2 +-
 sdks/java/io/hadoop-file-system/pom.xml         |  33 +-
 sdks/java/io/hadoop/input-format/pom.xml        |   2 +-
 .../hadoop/inputformat/HadoopInputFormatIO.java |   2 +-
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml        |   4 +-
 .../inputformat/HIFIOWithElasticTest.java       |  11 +-
 sdks/java/io/hadoop/pom.xml                     |   2 +-
 sdks/java/io/hbase/pom.xml                      |  26 +-
 .../io/hbase/HBaseCoderProviderRegistrar.java   |  40 --
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   |  48 +-
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |  42 --
 .../hbase/HBaseCoderProviderRegistrarTest.java  |  45 --
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  49 +-
 sdks/java/io/hcatalog/pom.xml                   | 175 -----
 .../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 492 -------------
 .../beam/sdk/io/hcatalog/package-info.java      |  22 -
 .../io/hcatalog/EmbeddedMetastoreService.java   |  87 ---
 .../beam/sdk/io/hcatalog/HCatalogIOTest.java    | 277 --------
 .../sdk/io/hcatalog/HCatalogIOTestUtils.java    | 108 ---
 .../hcatalog/src/test/resources/hive-site.xml   | 301 --------
 sdks/java/io/jdbc/pom.xml                       |   4 +-
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |   2 +-
 sdks/java/io/jms/pom.xml                        |   2 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |   2 +-
 sdks/java/io/kafka/pom.xml                      |   2 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 132 ++--
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  30 -
 sdks/java/io/kinesis/pom.xml                    |   2 +-
 .../sdk/io/kinesis/CheckpointGenerator.java     |   6 +-
 .../beam/sdk/io/kinesis/CustomOptional.java     | 111 ++-
 .../io/kinesis/DynamicCheckpointGenerator.java  |  52 +-
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  49 +-
 .../sdk/io/kinesis/KinesisClientProvider.java   |   4 +-
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 281 ++++----
 .../beam/sdk/io/kinesis/KinesisReader.java      | 206 +++---
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  97 ++-
 .../beam/sdk/io/kinesis/KinesisRecord.java      | 177 +++--
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  68 +-
 .../beam/sdk/io/kinesis/KinesisSource.java      | 147 ++--
 .../beam/sdk/io/kinesis/RecordFilter.java       |  18 +-
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  37 +-
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    | 241 ++++---
 .../sdk/io/kinesis/ShardRecordsIterator.java    | 106 ++-
 .../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 +++---
 .../beam/sdk/io/kinesis/StartingPoint.java      |  84 ++-
 .../io/kinesis/StaticCheckpointGenerator.java   |  27 +-
 .../io/kinesis/TransientKinesisException.java   |   7 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 539 +++++++-------
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  27 +-
 .../kinesis/DynamicCheckpointGeneratorTest.java |  33 +-
 .../sdk/io/kinesis/KinesisMockReadTest.java     |  97 ++-
 .../io/kinesis/KinesisReaderCheckpointTest.java |  52 +-
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    | 127 ++--
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  | 166 +++--
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  34 +-
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |  43 +-
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  70 +-
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  52 +-
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  42 +-
 .../sdk/io/kinesis/ShardCheckpointTest.java     | 203 +++---
 .../io/kinesis/ShardRecordsIteratorTest.java    | 216 +++---
 .../io/kinesis/SimplifiedKinesisClientTest.java | 351 +++++----
 sdks/java/io/mongodb/pom.xml                    |   2 +-
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |   2 +-
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 317 ++-------
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      |  37 -
 sdks/java/io/mqtt/pom.xml                       |   2 +-
 .../org/apache/beam/sdk/io/mqtt/MqttIO.java     |   2 +-
 sdks/java/io/pom.xml                            |  35 +-
 sdks/java/io/xml/pom.xml                        |   2 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  21 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |   4 +-
 sdks/java/java8tests/pom.xml                    |   2 +-
 sdks/java/javadoc/pom.xml                       |  19 +-
 .../maven-archetypes/examples-java8/pom.xml     |   2 +-
 .../main/resources/archetype-resources/pom.xml  |   1 +
 sdks/java/maven-archetypes/examples/pom.xml     |   2 +-
 .../main/resources/archetype-resources/pom.xml  |   1 +
 sdks/java/maven-archetypes/pom.xml              |   2 +-
 sdks/java/maven-archetypes/starter/pom.xml      |   2 +-
 .../resources/projects/basic/reference/pom.xml  |   2 +-
 sdks/java/pom.xml                               |   2 +-
 sdks/pom.xml                                    |   2 +-
 sdks/python/apache_beam/coders/coder_impl.py    |   4 -
 sdks/python/apache_beam/coders/coders.py        |   7 +-
 .../apache_beam/coders/coders_test_common.py    |   8 -
 .../examples/snippets/snippets_test.py          |  16 -
 .../apache_beam/examples/streaming_wordcount.py |  25 +-
 .../apache_beam/examples/windowed_wordcount.py  |  93 ---
 sdks/python/apache_beam/io/filesystem.py        |  22 +-
 sdks/python/apache_beam/io/gcp/gcsio.py         |  10 +-
 sdks/python/apache_beam/io/gcp/pubsub.py        | 180 ++---
 sdks/python/apache_beam/io/gcp/pubsub_test.py   | 101 +--
 .../io/gcp/tests/bigquery_matcher.py            |   6 +-
 .../io/gcp/tests/bigquery_matcher_test.py       |   2 +-
 sdks/python/apache_beam/io/range_trackers.py    | 130 ++++
 .../apache_beam/io/range_trackers_test.py       | 186 +++++
 .../apache_beam/options/pipeline_options.py     |  35 +-
 .../options/pipeline_options_test.py            |  39 +-
 .../apache_beam/options/value_provider_test.py  |  93 ++-
 sdks/python/apache_beam/pipeline.py             | 230 +-----
 sdks/python/apache_beam/pipeline_test.py        |  53 --
 sdks/python/apache_beam/portability/__init__.py |  18 -
 .../apache_beam/portability/api/__init__.py     |  21 -
 sdks/python/apache_beam/pvalue.py               |   2 +-
 sdks/python/apache_beam/runners/api/__init__.py |  21 +
 .../runners/dataflow/dataflow_runner.py         |  91 +--
 .../runners/dataflow/dataflow_runner_test.py    |  24 +-
 .../runners/dataflow/internal/apiclient.py      |  35 +-
 .../runners/dataflow/internal/apiclient_test.py |  29 +-
 .../runners/dataflow/internal/dependency.py     |  69 +-
 .../runners/dataflow/native_io/iobase_test.py   |  39 +-
 .../dataflow/native_io/streaming_create.py      |  72 --
 .../runners/dataflow/ptransform_overrides.py    |  52 --
 .../runners/direct/bundle_factory.py            |   2 +-
 .../apache_beam/runners/direct/direct_runner.py | 108 ---
 .../runners/direct/evaluation_context.py        |  73 +-
 .../apache_beam/runners/direct/executor.py      | 135 ++--
 .../runners/direct/transform_evaluator.py       | 447 +-----------
 .../runners/direct/transform_result.py          |  41 ++
 sdks/python/apache_beam/runners/direct/util.py  |  67 --
 .../runners/direct/watermark_manager.py         | 100 +--
 .../apache_beam/runners/pipeline_context.py     |  19 +-
 .../runners/portability/fn_api_runner.py        | 306 ++++----
 .../runners/portability/fn_api_runner_test.py   |  31 +-
 .../runners/worker/bundle_processor.py          | 426 -----------
 .../apache_beam/runners/worker/data_plane.py    |  28 +-
 .../runners/worker/data_plane_test.py           |   2 +-
 .../apache_beam/runners/worker/log_handler.py   |   2 +-
 .../runners/worker/log_handler_test.py          |   2 +-
 .../runners/worker/operation_specs.py           |   9 +-
 .../apache_beam/runners/worker/operations.py    |   1 -
 .../apache_beam/runners/worker/sdk_worker.py    | 370 +++++++++-
 .../runners/worker/sdk_worker_main.py           |   2 +-
 .../runners/worker/sdk_worker_test.py           |  95 ++-
 sdks/python/apache_beam/testing/test_stream.py  |   5 -
 .../apache_beam/testing/test_stream_test.py     |  68 --
 sdks/python/apache_beam/transforms/combiners.py |   8 -
 .../apache_beam/transforms/combiners_test.py    |   7 +-
 sdks/python/apache_beam/transforms/core.py      | 102 ++-
 .../python/apache_beam/transforms/ptransform.py |  43 +-
 sdks/python/apache_beam/transforms/trigger.py   |  28 +-
 sdks/python/apache_beam/transforms/window.py    |   4 +-
 .../apache_beam/typehints/trivial_inference.py  |   3 +-
 .../typehints/trivial_inference_test.py         |   7 -
 sdks/python/apache_beam/utils/plugin.py         |  42 --
 sdks/python/apache_beam/utils/timestamp.py      |   5 -
 sdks/python/apache_beam/utils/urns.py           |   2 +-
 sdks/python/apache_beam/version.py              |   2 +-
 sdks/python/gen_protos.py                       |   2 +-
 sdks/python/pom.xml                             |   2 +-
 sdks/python/run_pylint.sh                       |   2 +-
 sdks/python/setup.py                            |   5 +-
 462 files changed, 10754 insertions(+), 21718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 36c5cc8..bd419a7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,7 +25,7 @@ sdks/python/**/*.egg
 sdks/python/LICENSE
 sdks/python/NOTICE
 sdks/python/README.md
-sdks/python/apache_beam/portability/api/*pb2*.*
+sdks/python/apache_beam/runners/api/*pb2*.*
 
 # Ignore IntelliJ files.
 .idea/

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/common_job_properties.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/common_job_properties.groovy b/.test-infra/jenkins/common_job_properties.groovy
index 70534c6..6d4d68b 100644
--- a/.test-infra/jenkins/common_job_properties.groovy
+++ b/.test-infra/jenkins/common_job_properties.groovy
@@ -23,12 +23,11 @@
 class common_job_properties {
 
   // Sets common top-level job properties for website repository jobs.
-  static void setTopLevelWebsiteJobProperties(context,
-                                              String branch = 'asf-site') {
+  static void setTopLevelWebsiteJobProperties(context) {
     setTopLevelJobProperties(
             context,
             'beam-site',
-            branch,
+            'asf-site',
             'beam',
             30)
   }
@@ -265,10 +264,8 @@ class common_job_properties {
         shell('rm -rf PerfKitBenchmarker')
         // Clone appropriate perfkit branch
         shell('git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git')
-        // Install Perfkit benchmark requirements.
+        // Install job requirements.
         shell('pip install --user -r PerfKitBenchmarker/requirements.txt')
-        // Install job requirements for Python SDK.
-        shell('pip install --user -e sdks/python/[gcp,test]')
         // Launch performance test.
         shell("python PerfKitBenchmarker/pkb.py $pkbArgs")
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy
deleted file mode 100644
index 6a71bda..0000000
--- a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import common_job_properties
-
-// This job runs the Beam Python performance tests on PerfKit Benchmarker.
-job('beam_PerformanceTests_Python'){
-  // Set default Beam job properties.
-  common_job_properties.setTopLevelMainJobProperties(delegate)
-
-  // Run job in postcommit every 6 hours, don't trigger every push.
-  common_job_properties.setPostCommit(
-      delegate,
-      '0 */6 * * *',
-      false,
-      'commits@beam.apache.org')
-
-  // Allows triggering this build against pull requests.
-  common_job_properties.enablePhraseTriggeringFromPullRequest(
-      delegate,
-      'Python SDK Performance Test',
-      'Run Python Performance Test')
-
-  def pipelineArgs = [
-      project: 'apache-beam-testing',
-      staging_location: 'gs://temp-storage-for-end-to-end-tests/staging-it',
-      temp_location: 'gs://temp-storage-for-end-to-end-tests/temp-it',
-      output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output'
-  ]
-  def pipelineArgList = []
-  pipelineArgs.each({
-    key, value -> pipelineArgList.add("--$key=$value")
-  })
-  def pipelineArgsJoined = pipelineArgList.join(',')
-
-  def argMap = [
-      beam_sdk : 'python',
-      benchmarks: 'beam_integration_benchmark',
-      beam_it_args: pipelineArgsJoined
-  ]
-
-  common_job_properties.buildPerformanceTest(delegate, argMap)
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
index df0a2c7..f23e741 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy
@@ -37,8 +37,6 @@ matrixJob('beam_PostCommit_Java_JDK_Versions_Test') {
   common_job_properties.setPostCommit(
       delegate,
       '0 */6 * * *',
-      false,
-      '',  // TODO: Remove last two args once test is stable again.
       false)
 
   // Allows triggering this build against pull requests.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
index 6ef272c..f781b4e 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy
@@ -32,8 +32,7 @@ mavenJob('beam_PostCommit_Java_MavenInstall_Windows') {
   common_job_properties.setMavenConfig(delegate, 'Maven 3.3.3 (Windows)')
 
   // Sets that this is a PostCommit job.
-  // TODO(BEAM-1042, BEAM-1045, BEAM-2269, BEAM-2299) Turn notifications back on once fixed.
-  common_job_properties.setPostCommit(delegate, '0 */6 * * *', false, '', false)
+  common_job_properties.setPostCommit(delegate, '0 */6 * * *', false)
 
   // Allows triggering this build against pull requests.
   common_job_properties.enablePhraseTriggeringFromPullRequest(

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy b/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
deleted file mode 100644
index 0e2ae3f..0000000
--- a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import common_job_properties
-
-// Defines a job.
-job('beam_PreCommit_Website_Merge') {
-  description('Runs website tests for mergebot.')
-
-  // Set common parameters.
-  common_job_properties.setTopLevelWebsiteJobProperties(delegate, 'mergebot')
-
-  triggers {
-    githubPush()
-  }
-
-  steps {
-    // Run the following shell script as a build step.
-    shell '''
-        # Install RVM per instructions at https://rvm.io/rvm/install.
-        RVM_GPG_KEY=409B6B1796C275462A1703113804BB82D39DC0E3
-        gpg --keyserver hkp://keys.gnupg.net --recv-keys $RVM_GPG_KEY
-            
-        \\curl -sSL https://get.rvm.io | bash
-        source /home/jenkins/.rvm/scripts/rvm
-
-        # Install Ruby.
-        RUBY_VERSION_NUM=2.3.0
-        rvm install ruby $RUBY_VERSION_NUM --autolibs=read-only
-
-        # Install Bundler gem
-        PATH=~/.gem/ruby/$RUBY_VERSION_NUM/bin:$PATH
-        GEM_PATH=~/.gem/ruby/$RUBY_VERSION_NUM/:$GEM_PATH
-        gem install bundler --user-install
-
-        # Install all needed gems.
-        bundle install --path ~/.gem/
-
-        # Build the new site and test it.
-        rm -fr ./content/
-        bundle exec rake test
-    '''.stripIndent().trim()
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index ae64a79..701e4fe 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -34,6 +34,10 @@
 
   <packaging>jar</packaging>
 
+  <properties>
+    <spark.version>1.6.2</spark.version>
+  </properties>
+
   <profiles>
 
     <!--
@@ -62,12 +66,6 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-apex</artifactId>
           <scope>runtime</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
         <!--
           Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
@@ -97,12 +95,6 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-flink_2.10</artifactId>
           <scope>runtime</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -124,11 +116,13 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 49865ba..5e6df9c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,12 +17,11 @@
  */
 package org.apache.beam.examples.common;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Verify.verifyNotNull;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -54,12 +53,22 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
 
   @Override
   public PDone expand(PCollection<String> input) {
+    // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+    // component from that path for the PerWindowFiles.
+    String prefix = "";
     ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
-    TextIO.Write write =
-        TextIO.write()
-            .to(new PerWindowFiles(resource))
-            .withTempDirectory(resource.getCurrentDirectory())
-            .withWindowedWrites();
+    if (!resource.isDirectory()) {
+      prefix = verifyNotNull(
+          resource.getFilename(),
+          "A non-directory resource should have a non-null filename: %s",
+          resource);
+    }
+
+
+    TextIO.Write write = TextIO.write()
+        .to(resource.getCurrentDirectory())
+        .withFilenamePolicy(new PerWindowFiles(prefix))
+        .withWindowedWrites();
     if (numShards != null) {
       write = write.withNumShards(numShards);
     }
@@ -74,36 +83,31 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
    */
   public static class PerWindowFiles extends FilenamePolicy {
 
-    private final ResourceId baseFilename;
+    private final String prefix;
 
-    public PerWindowFiles(ResourceId baseFilename) {
-      this.baseFilename = baseFilename;
+    public PerWindowFiles(String prefix) {
+      this.prefix = prefix;
     }
 
     public String filenamePrefixForWindow(IntervalWindow window) {
-      String prefix =
-          baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
       return String.format("%s-%s-%s",
           prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
     }
 
     @Override
-    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext context, String extension) {
       IntervalWindow window = (IntervalWindow) context.getWindow();
-      String filename =
-          String.format(
-              "%s-%s-of-%s%s",
-              filenamePrefixForWindow(window),
-              context.getShardNumber(),
-              context.getNumShards(),
-              outputFileHints.getSuggestedFilenameSuffix());
-      return baseFilename
-          .getCurrentDirectory()
-          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+      String filename = String.format(
+          "%s-%s-of-%s%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context context, String extension) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index bec7952..eb7e4c4 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -150,8 +149,7 @@ public class WindowedWordCountIT {
 
     String outputPrefix = options.getOutput();
 
-    PerWindowFiles filenamePolicy =
-        new PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
+    PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix);
 
     List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 6fd29a4..56295a4 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -35,6 +35,10 @@
 
   <packaging>jar</packaging>
 
+  <properties>
+    <spark.version>1.6.2</spark.version>
+  </properties>
+
   <profiles>
     <!--
       The direct runner is available by default.
@@ -62,12 +66,6 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-apex</artifactId>
           <scope>runtime</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
         <!--
           Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
@@ -97,12 +95,6 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-flink_2.10</artifactId>
           <scope>runtime</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -124,11 +116,13 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index 1d60198..e6c8ddb 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -18,6 +18,7 @@
 package org.apache.beam.examples.complete.game.utils;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -27,7 +28,6 @@ import java.util.TimeZone;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -111,12 +111,21 @@ public class WriteToText<InputT>
       checkArgument(
           input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
 
+      // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+      // component from that path for the PerWindowFiles.
+      String prefix = "";
       ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+      if (!resource.isDirectory()) {
+        prefix = verifyNotNull(
+            resource.getFilename(),
+            "A non-directory resource should have a non-null filename: %s",
+            resource);
+      }
 
       return input.apply(
           TextIO.write()
-              .to(new PerWindowFiles(resource))
-              .withTempDirectory(resource.getCurrentDirectory())
+              .to(resource.getCurrentDirectory())
+              .withFilenamePolicy(new PerWindowFiles(prefix))
               .withWindowedWrites()
               .withNumShards(3));
     }
@@ -130,33 +139,31 @@ public class WriteToText<InputT>
    */
   protected static class PerWindowFiles extends FilenamePolicy {
 
-    private final ResourceId prefix;
+    private final String prefix;
 
-    public PerWindowFiles(ResourceId prefix) {
+    public PerWindowFiles(String prefix) {
       this.prefix = prefix;
     }
 
     public String filenamePrefixForWindow(IntervalWindow window) {
-      String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
-      return String.format(
-          "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
+      return String.format("%s-%s-%s",
+          prefix, formatter.print(window.start()), formatter.print(window.end()));
     }
 
     @Override
-    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext context, String extension) {
       IntervalWindow window = (IntervalWindow) context.getWindow();
-      String filename =
-          String.format(
-              "%s-%s-of-%s%s",
-              filenamePrefixForWindow(window),
-              context.getShardNumber(),
-              context.getNumShards(),
-              outputFileHints.getSuggestedFilenameSuffix());
-      return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+      String filename = String.format(
+          "%s-%s-of-%s%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context context, String extension) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 611e2b3..745c210 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -276,8 +276,6 @@ public class LeaderBoardTest implements Serializable {
         .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
             event(TestUser.BLUE_TWO, 3, Duration.ZERO),
             event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
-        // Move the watermark to the end of the window to output on time
-        .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION))
         // Move the watermark past the end of the allowed lateness plus the end of the window
         .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
             .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 51f4c35..a7e61dd 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 987760f..1d8d4b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
   <url>http://beam.apache.org/</url>
   <inceptionYear>2016</inceptionYear>
 
-  <version>2.2.0-SNAPSHOT</version>
+  <version>2.1.0-SNAPSHOT</version>
 
   <licenses>
     <license>
@@ -101,14 +101,13 @@
     <beamSurefireArgline />
 
     <!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
-    <apache.commons.compress.version>1.14</apache.commons.compress.version>
-    <apache.commons.lang.version>3.6</apache.commons.lang.version>
-    <apache.commons.text.version>1.1</apache.commons.text.version>
+    <apache.commons.lang.version>3.5</apache.commons.lang.version>
+    <apache.commons.compress.version>1.9</apache.commons.compress.version>
     <apex.kryo.version>2.24.0</apex.kryo.version>
     <api-common.version>1.0.0-rc2</api-common.version>
     <avro.version>1.8.2</avro.version>
     <bigquery.version>v2-rev295-1.22.0</bigquery.version>
-    <bigtable.version>0.9.7.1</bigtable.version>
+    <bigtable.version>0.9.6.2</bigtable.version>
     <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
     <pubsubgrpc.version>0.1.0</pubsubgrpc.version>
     <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
@@ -127,14 +126,8 @@
     <guava.version>20.0</guava.version>
     <grpc.version>1.2.0</grpc.version>
     <grpc-google-common-protos.version>0.1.9</grpc-google-common-protos.version>
-    <!--
-      This is the version of Hadoop used to compile the module that depend on Hadoop.
-      This dependency is defined with a provided scope.
-      Users must supply their own Hadoop version at runtime.
-    -->
-    <hadoop.version>2.7.3</hadoop.version>
     <hamcrest.version>1.3</hamcrest.version>
-    <jackson.version>2.8.9</jackson.version>
+    <jackson.version>2.8.8</jackson.version>
     <findbugs.version>3.0.1</findbugs.version>
     <joda.version>2.4</joda.version>
     <junit.version>4.12</junit.version>
@@ -144,8 +137,8 @@
     <protobuf.version>3.2.0</protobuf.version>
     <pubsub.version>v1-rev10-1.22.0</pubsub.version>
     <slf4j.version>1.7.14</slf4j.version>
-    <spanner.version>0.20.0-beta</spanner.version>
-    <spark.version>1.6.3</spark.version>
+    <spanner.version>0.16.0-beta</spanner.version>
+    <spark.version>1.6.2</spark.version>
     <spring.version>4.3.5.RELEASE</spring.version>
     <stax2.version>3.1.4</stax2.version>
     <storage.version>v1-rev71-1.22.0</storage.version>
@@ -159,7 +152,7 @@
     <failsafe-plugin.version>2.20</failsafe-plugin.version>
     <maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
     <maven-dependency-plugin.version>3.0.1</maven-dependency-plugin.version>
-    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
+    <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version>
     <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
     <maven-resources-plugin.version>3.0.2</maven-resources-plugin.version>
     <maven-shade-plugin.version>3.0.0</maven-shade-plugin.version>
@@ -167,7 +160,6 @@
     <compiler.error.flag>-Werror</compiler.error.flag>
     <compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag>
     <compiler.default.exclude>nothing</compiler.default.exclude>
-    <gax-grpc.version>0.20.0</gax-grpc.version>
   </properties>
 
   <packaging>pom</packaging>
@@ -429,18 +421,6 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-java-io-amqp</artifactId>
-        <version>${project.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-java-io-cassandra</artifactId>
-        <version>${project.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -478,12 +458,6 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-java-io-hcatalog</artifactId>
-        <version>${project.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-jdbc</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -538,13 +512,6 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
-        <artifactId>beam-runners-core-java</artifactId>
-        <version>${project.version}</version>
-        <type>test-jar</type>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.beam</groupId>
         <artifactId>beam-runners-direct-java</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -598,12 +565,6 @@
       </dependency>
 
       <dependency>
-        <groupId>org.apache.commons</groupId>
-        <artifactId>commons-text</artifactId>
-        <version>${apache.commons.text.version}</version>
-      </dependency>
-
-      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-all</artifactId>
         <version>${grpc.version}</version>
@@ -664,12 +625,6 @@
       </dependency>
 
       <dependency>
-        <groupId>com.google.api</groupId>
-        <artifactId>gax-grpc</artifactId>
-        <version>${gax-grpc.version}</version>
-      </dependency>
-
-      <dependency>
         <groupId>com.google.api-client</groupId>
         <artifactId>google-api-client</artifactId>
         <version>${google-clients.version}</version>
@@ -884,11 +839,6 @@
       </dependency>
 
       <dependency>
-        <groupId>com.google.cloud</groupId>
-        <artifactId>google-cloud-core-grpc</artifactId>
-        <version>${grpc.version}</version>
-      </dependency>
-      <dependency>
         <groupId>com.google.cloud.bigtable</groupId>
         <artifactId>bigtable-protos</artifactId>
         <version>${bigtable.version}</version>
@@ -1100,42 +1050,6 @@
         <version>${snappy-java.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-client</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-common</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-mapreduce-client-core</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.spark</groupId>
-        <artifactId>spark-core_2.10</artifactId>
-        <version>${spark.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.spark</groupId>
-        <artifactId>spark-streaming_2.10</artifactId>
-        <version>${spark.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.spark</groupId>
-        <artifactId>spark-network-common_2.10</artifactId>
-        <version>${spark.version}</version>
-      </dependency>
-
       <!-- Testing -->
 
       <dependency>
@@ -1205,27 +1119,6 @@
         <scope>test</scope>
       </dependency>
 
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-minicluster</artifactId>
-        <version>${hadoop.version}</version>
-        <scope>test</scope>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-hdfs</artifactId>
-        <version>${hadoop.version}</version>
-        <scope>test</scope>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-hdfs</artifactId>
-        <version>${hadoop.version}</version>
-        <classifier>tests</classifier>
-        <scope>test</scope>
-      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index fd5aafb..4a36bec 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -75,13 +75,6 @@
       <artifactId>apex-engine</artifactId>
       <version>${apex.core.version}</version>
       <scope>runtime</scope>
-      <exclusions>
-        <!-- Fix build on JDK-9 -->
-        <exclusion>
-          <groupId>jdk.tools</groupId>
-          <artifactId>jdk.tools</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <!-- Beam -->
@@ -191,13 +184,6 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-java</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 
   <build>
@@ -263,12 +249,12 @@
             <configuration>
               <ignoredUsedUndeclaredDependencies>
                 <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:${apex.core.version}</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::${apache.commons.lang.version}</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.4</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::${apex.kryo.version}</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.3.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:${hadoop.version}</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:20.0</ignoredUsedUndeclaredDependency>
               </ignoredUsedUndeclaredDependencies>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index fd0a1c9..c595b3f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -62,6 +62,8 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.AsIterable;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -212,7 +214,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
    * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
    */
   public static class CreateApexPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
     private static final long serialVersionUID = 1L;
     private PCollectionView<ViewT> view;
 
@@ -226,13 +228,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollection<ElemT> expand(PCollection<ElemT> input) {
-      return PCollection.<ElemT>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
-    }
-
-    public PCollectionView<ViewT> getView() {
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
       return view;
     }
   }
@@ -245,7 +241,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   private static class StreamingWrapSingletonInList<T>
-      extends PTransform<PCollection<T>, PCollection<T>> {
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
     private static final long serialVersionUID = 1L;
     CreatePCollectionView<T, T> transform;
 
@@ -258,11 +254,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollection<T> expand(PCollection<T> input) {
-      input
+    public PCollectionView<T> expand(PCollection<T> input) {
+      return input
           .apply(ParDo.of(new WrapAsList<T>()))
-          .apply(CreateApexPCollectionView.<List<T>, T>of(transform.getView()));
-      return input;
+          .apply(CreateApexPCollectionView.<T, T>of(transform.getView()));
     }
 
     @Override
@@ -272,12 +267,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     static class Factory<T>
         extends SingleInputOutputOverrideFactory<
-            PCollection<T>, PCollection<T>,
+            PCollection<T>, PCollectionView<T>,
             CreatePCollectionView<T, T>> {
       @Override
-      public PTransformReplacement<PCollection<T>, PCollection<T>> getReplacementTransform(
-          AppliedPTransform<PCollection<T>, PCollection<T>, CreatePCollectionView<T, T>>
-              transform) {
+      public PTransformReplacement<PCollection<T>, PCollectionView<T>>
+          getReplacementTransform(
+              AppliedPTransform<
+                      PCollection<T>, PCollectionView<T>,
+                      CreatePCollectionView<T, T>>
+                  transform) {
         return PTransformReplacement.of(
             PTransformReplacements.getSingletonMainInput(transform),
             new StreamingWrapSingletonInList<>(transform.getTransform()));
@@ -286,19 +284,18 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   private static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollection<T>> {
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
     private static final long serialVersionUID = 1L;
-    private final PCollectionView<Iterable<T>> view;
 
-    private StreamingViewAsIterable(PCollectionView<Iterable<T>> view) {
-      this.view = view;
-    }
+    private StreamingViewAsIterable() {}
 
     @Override
-    public PCollection<T> expand(PCollection<T> input) {
-      return ((PCollection<T>)
-              input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()))
-          .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view));
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view));
     }
 
     @Override
@@ -308,17 +305,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     static class Factory<T>
         extends SingleInputOutputOverrideFactory<
-            PCollection<T>, PCollection<T>, CreatePCollectionView<T, Iterable<T>>> {
+            PCollection<T>, PCollectionView<Iterable<T>>, View.AsIterable<T>> {
       @Override
-      public PTransformReplacement<PCollection<T>, PCollection<T>>
+      public PTransformReplacement<PCollection<T>, PCollectionView<Iterable<T>>>
           getReplacementTransform(
-              AppliedPTransform<
-                      PCollection<T>, PCollection<T>,
-                      CreatePCollectionView<T, Iterable<T>>>
+              AppliedPTransform<PCollection<T>, PCollectionView<Iterable<T>>, AsIterable<T>>
                   transform) {
         return PTransformReplacement.of(
             PTransformReplacements.getSingletonMainInput(transform),
-            new StreamingViewAsIterable<T>(transform.getTransform().getView()));
+            new StreamingViewAsIterable<T>());
       }
     }
   }
@@ -381,7 +376,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
         AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
           transform) {
       return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
-          SplittableParDo.forJavaParDo(transform.getTransform()));
+          new SplittableParDo<>(transform.getTransform()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 02f53ec..bda074b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,6 +154,7 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
           unboundedSource, true, context.getPipelineOptions());
       context.addOperator(operator, operator.output);
     }
+
   }
 
   private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
@@ -160,10 +162,11 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void translate(
-        CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context) {
-      context.addView(transform.getView());
-      LOG.debug("view {}", transform.getView().getName());
+    public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
+        TranslationContext context) {
+      PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
+      context.addView(view);
+      LOG.debug("view {}", view.getName());
     }
   }
 
@@ -174,8 +177,9 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
     @Override
     public void translate(
         CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
-      context.addView(transform.getView());
-      LOG.debug("view {}", transform.getView().getName());
+      PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
+      context.addView(view);
+      LOG.debug("view {}", view.getName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 94d13e1..aff3863 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -34,7 +34,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -94,8 +93,7 @@ class TranslationContext {
   }
 
   public <InputT extends PValue> InputT getInput() {
-    return (InputT)
-        Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
+    return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values());
   }
 
   public Map<TupleTag<?>, PValue> getOutputs() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index c3cbab2..809ca2a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -359,7 +359,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       }
     }
     if (sideInputs.isEmpty()) {
-      outputWatermark(mark);
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", mark);
+      }
+      output.emit(mark);
       return;
     }
 
@@ -367,20 +370,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
         Math.min(pushedBackWatermark.get(), currentInputWatermark);
     if (potentialOutputWatermark > currentOutputWatermark) {
       currentOutputWatermark = potentialOutputWatermark;
-      outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
-    }
-  }
-
-  private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
-    if (traceTuples) {
-      LOG.debug("\nemitting {}\n", mark);
-    }
-    output.emit(mark);
-    if (!additionalOutputPortMapping.isEmpty()) {
-      for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
-          additionalOutputPortMapping.values()) {
-        additionalOutput.emit(mark);
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
       }
+      output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index ba75746..e76096e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -123,15 +123,11 @@ public class WordCountTest {
     options.setInputFile(new File(inputFile).getAbsolutePath());
     String outputFilePrefix = "target/wordcountresult.txt";
     options.setOutput(outputFilePrefix);
+    WordCountTest.main(TestPipeline.convertToArgs(options));
 
     File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
     File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
-    Assert.assertTrue(!outFile1.exists() || outFile1.delete());
-    Assert.assertTrue(!outFile2.exists() || outFile2.delete());
-
-    WordCountTest.main(TestPipeline.convertToArgs(options));
-
-    Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists());
+    Assert.assertTrue(outFile1.exists() && outFile2.exists());
     HashSet<String> results = new HashSet<>();
     results.addAll(FileUtils.readLines(outFile1));
     results.addAll(FileUtils.readLines(outFile2));