You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:16 UTC
[08/55] [abbrv] beam git commit: Refactor classes into packages
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
new file mode 100644
index 0000000..15e17a8
--- /dev/null
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.sources;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test UnboundedEventSource.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedEventSourceTest {
+ private GeneratorConfig makeConfig(long n) {
+ return new GeneratorConfig(
+ NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
+ }
+
+ /**
+ * Helper for tracking which ids we've seen (so we can detect dups) and
+ * confirming reading events match the model events.
+ */
+ private static class EventIdChecker {
+ private Set<Long> seenPersonIds = new HashSet<>();
+ private Set<Long> seenAuctionIds = new HashSet<>();
+
+ public void add(Event event) {
+ if (event.newAuction != null) {
+ assertTrue(seenAuctionIds.add(event.newAuction.id));
+ } else if (event.newPerson != null) {
+ assertTrue(seenPersonIds.add(event.newPerson.id));
+ }
+ }
+
+ public void add(int n, UnboundedReader<Event> reader, Generator modelGenerator)
+ throws IOException {
+ for (int i = 0; i < n; i++) {
+ assertTrue(modelGenerator.hasNext());
+ Event modelEvent = modelGenerator.next().getValue();
+ assertTrue(reader.advance());
+ Event actualEvent = reader.getCurrent();
+ assertEquals(modelEvent.toString(), actualEvent.toString());
+ add(actualEvent);
+ }
+ }
+ }
+
+ /**
+ * Check aggressively checkpointing and resuming a reader gives us exactly the
+ * same event stream as reading directly.
+ */
+ @Test
+ public void resumeFromCheckpoint() throws IOException {
+ Random random = new Random(297);
+ int n = 47293;
+ GeneratorConfig config = makeConfig(n);
+ Generator modelGenerator = new Generator(config);
+
+ EventIdChecker checker = new EventIdChecker();
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ Pipeline p = TestPipeline.create(options);
+ UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false);
+ UnboundedReader<Event> reader = source.createReader(options, null);
+
+ while (n > 0) {
+ int m = Math.min(459 + random.nextInt(455), n);
+ System.out.printf("reading %d...\n", m);
+ checker.add(m, reader, modelGenerator);
+ n -= m;
+ System.out.printf("splitting with %d remaining...\n", n);
+ CheckpointMark checkpointMark = reader.getCheckpointMark();
+ assertTrue(checkpointMark instanceof Generator.Checkpoint);
+ reader = source.createReader(options, (Generator.Checkpoint) checkpointMark);
+ }
+
+ assertFalse(reader.advance());
+ }
+}