You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/30 16:59:32 UTC

[3/4] beam git commit: [BEAM-2114] Tests for KafkaIO: use ExpectedException rule

[BEAM-2114] Tests for KafkaIO: use ExpectedException rule


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34e00465
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34e00465
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34e00465

Branch: refs/heads/master
Commit: 34e0046512282872f205166162b16b616f834e93
Parents: 10fc5f8
Author: peay <pe...@protonmail.com>
Authored: Sun Apr 30 18:22:38 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:45:30 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 33 +++++++++++++-------
 1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/34e00465/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 2f895fe..591c099 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -102,7 +102,6 @@ import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -240,8 +239,8 @@ public class KafkaIOTest {
   }
 
   /**
-   * Creates a consumer with two topics, with 5 partitions each.
-   * numElements are (round-robin) assigned all the 10 partitions.
+   * Creates a consumer with two topics, with 10 partitions each.
+   * numElements are (round-robin) assigned all the 20 partitions.
    */
   private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
       int numElements,
@@ -266,8 +265,9 @@ public class KafkaIOTest {
   }
 
   /**
-   * Creates a consumer with two topics, with 5 partitions each.
-   * numElements are (round-robin) assigned all the 10 partitions.
+   * Creates a consumer with two topics, with 10 partitions each.
+   * numElements are (round-robin) assigned all the 20 partitions.
+   * Coders are specified explicitly.
    */
   private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders(
           int numElements,
@@ -918,17 +918,22 @@ public class KafkaIOTest {
     }
   }
 
+  // class for which a coder cannot be infered
+  private static class NonInferableObject {
+
+  }
+
   // class for testing coder inference
-  private static class ObjectDeserializer
-          implements Deserializer<Object> {
+  private static class NonInferableObjectDeserializer
+          implements Deserializer<NonInferableObject> {
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
     }
 
     @Override
-    public Object deserialize(String topic, byte[] bytes) {
-      return new Object();
+    public NonInferableObject deserialize(String topic, byte[] bytes) {
+      return new NonInferableObject();
     }
 
     @Override
@@ -953,10 +958,14 @@ public class KafkaIOTest {
             instanceof VarLongCoder);
   }
 
-  @Test(expected = RuntimeException.class)
-  public void testInferKeyCoderFailure() {
+  @Rule public ExpectedException cannotInferException = ExpectedException.none();
+
+  @Test
+  public void testInferKeyCoderFailure() throws Exception {
+    cannotInferException.expect(RuntimeException.class);
+
     CoderRegistry registry = CoderRegistry.createDefault();
-    KafkaIO.inferCoder(registry, ObjectDeserializer.class);
+    KafkaIO.inferCoder(registry, NonInferableObjectDeserializer.class);
   }
 
   @Test