You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/06/21 19:58:43 UTC

[1/2] beam git commit: Fix minor issues on HCatalogIO

Repository: beam
Updated Branches:
  refs/heads/master 2d25b6840 -> 28c6fd42e


Fix minor issues on HCatalogIO

- Restrict access level when possible
- Rename Filter to Partition for the write to be coherent with the HCatalog API
- Improve test coverage
- Fix documentation details
- Implement TearDown method for the writer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c11f0ff5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c11f0ff5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c11f0ff5

Branch: refs/heads/master
Commit: c11f0ff57efca5786fb5da20006d9eb96b44cffe
Parents: 2d25b68
Author: Ismaël Mejía <ie...@apache.org>
Authored: Fri Jun 9 00:01:55 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed Jun 21 21:58:14 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 113 ++++++++-----------
 .../io/hcatalog/EmbeddedMetastoreService.java   |   3 +-
 .../beam/sdk/io/hcatalog/HCatalogIOTest.java    |  54 +++++----
 .../sdk/io/hcatalog/HCatalogIOTestUtils.java    |  22 ++--
 4 files changed, 90 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 07b56e3..1549dab 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -78,11 +78,10 @@ import org.slf4j.LoggerFactory;
  *
  * pipeline
  *   .apply(HCatalogIO.read()
- *       .withConfigProperties(configProperties) //mandatory
- *       .withTable("employee") //mandatory
+ *       .withConfigProperties(configProperties)
  *       .withDatabase("default") //optional, assumes default if none specified
- *       .withFilter(filterString) //optional,
- *       should be specified if the table is partitioned
+ *       .withTable("employee")
+ *       .withFilter(filterString) //optional, may be specified if the table is partitioned
  * }</pre>
  *
  * <h3>Writing using HCatalog</h3>
@@ -100,13 +99,11 @@ import org.slf4j.LoggerFactory;
  * pipeline
  *   .apply(...)
  *   .apply(HiveIO.write()
- *       .withConfigProperties(configProperties) //mandatory
- *       .withTable("employee") //mandatory
+ *       .withConfigProperties(configProperties)
  *       .withDatabase("default") //optional, assumes default if none specified
- *       .withFilter(partitionValues) //optional,
- *       should be specified if the table is partitioned
- *       .withBatchSize(1024L)) //optional,
- *       assumes a default batch size of 1024 if none specified
+ *       .withTable("employee")
+ *       .withPartition(partitionValues) //optional, may be specified if the table is partitioned
+ *       .withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified
  * }</pre>
  */
 @Experimental
@@ -114,14 +111,17 @@ public class HCatalogIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(HCatalogIO.class);
 
+  private static final long BATCH_SIZE = 1024L;
+  private static final String DEFAULT_DATABASE = "default";
+
   /** Write data to Hive. */
   public static Write write() {
-    return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(1024L).build();
+    return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(BATCH_SIZE).build();
   }
 
   /** Read data from Hive. */
   public static Read read() {
-    return new AutoValue_HCatalogIO_Read.Builder().setDatabase("default").build();
+    return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build();
   }
 
   private HCatalogIO() {}
@@ -130,44 +130,26 @@ public class HCatalogIO {
   @VisibleForTesting
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> {
-    @Nullable
-    abstract Map<String, String> getConfigProperties();
-
-    @Nullable
-    abstract String getDatabase();
-
-    @Nullable
-    abstract String getTable();
-
-    @Nullable
-    abstract String getFilter();
-
-    @Nullable
-    abstract ReaderContext getContext();
-
-    @Nullable
-    abstract Integer getSplitId();
-
+    @Nullable abstract Map<String, String> getConfigProperties();
+    @Nullable abstract String getDatabase();
+    @Nullable abstract String getTable();
+    @Nullable abstract String getFilter();
+    @Nullable abstract ReaderContext getContext();
+    @Nullable abstract Integer getSplitId();
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setConfigProperties(Map<String, String> configProperties);
-
       abstract Builder setDatabase(String database);
-
       abstract Builder setTable(String table);
-
       abstract Builder setFilter(String filter);
-
       abstract Builder setSplitId(Integer splitId);
-
       abstract Builder setContext(ReaderContext context);
-
       abstract Read build();
     }
 
-    /** Sets the configuration properties like metastore URI. This is mandatory */
+    /** Sets the configuration properties like metastore URI. */
     public Read withConfigProperties(Map<String, String> configProperties) {
       return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
     }
@@ -177,12 +159,12 @@ public class HCatalogIO {
       return toBuilder().setDatabase(database).build();
     }
 
-    /** Sets the table name to read from. This is mandatory */
+    /** Sets the table name to read from. */
     public Read withTable(String table) {
       return toBuilder().setTable(table).build();
     }
 
-    /** Sets the filter (partition) details. This is optional, assumes none if not specified */
+    /** Sets the filter details. This is optional, assumes none if not specified */
     public Read withFilter(String filter) {
       return toBuilder().setFilter(filter).build();
     }
@@ -220,7 +202,7 @@ public class HCatalogIO {
   /** A HCatalog {@link BoundedSource} reading {@link HCatRecord} from a given instance. */
   @VisibleForTesting
   static class BoundedHCatalogSource extends BoundedSource<HCatRecord> {
-    private Read spec;
+    private final Read spec;
 
     BoundedHCatalogSource(Read spec) {
       this.spec = spec;
@@ -367,38 +349,24 @@ public class HCatalogIO {
   /** A {@link PTransform} to write to a HCatalog managed source. */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<HCatRecord>, PDone> {
-    @Nullable
-    abstract Map<String, String> getConfigProperties();
-
-    @Nullable
-    abstract String getDatabase();
-
-    @Nullable
-    abstract String getTable();
-
-    @Nullable
-    abstract Map getFilter();
-
+    @Nullable abstract Map<String, String> getConfigProperties();
+    @Nullable abstract String getDatabase();
+    @Nullable abstract String getTable();
+    @Nullable abstract Map<String, String> getPartition();
     abstract long getBatchSize();
-
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setConfigProperties(Map<String, String> configProperties);
-
       abstract Builder setDatabase(String database);
-
       abstract Builder setTable(String table);
-
-      abstract Builder setFilter(Map partition);
-
+      abstract Builder setPartition(Map<String, String> partition);
       abstract Builder setBatchSize(long batchSize);
-
       abstract Write build();
     }
 
-    /** Sets the configuration properties like metastore URI. This is mandatory */
+    /** Sets the configuration properties like metastore URI. */
     public Write withConfigProperties(Map<String, String> configProperties) {
       return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
     }
@@ -408,14 +376,14 @@ public class HCatalogIO {
       return toBuilder().setDatabase(database).build();
     }
 
-    /** Sets the table name to write to, the table should exist beforehand. This is mandatory */
+    /** Sets the table name to write to, the table should exist beforehand. */
     public Write withTable(String table) {
       return toBuilder().setTable(table).build();
     }
 
-    /** Sets the filter (partition) details. This is required if the table is partitioned */
-    public Write withFilter(Map filter) {
-      return toBuilder().setFilter(filter).build();
+    /** Sets the partition details. */
+    public Write withPartition(Map<String, String> partition) {
+      return toBuilder().setPartition(partition).build();
     }
 
     /**
@@ -454,7 +422,7 @@ public class HCatalogIO {
         super.populateDisplayData(builder);
         builder.addIfNotNull(DisplayData.item("database", spec.getDatabase()));
         builder.add(DisplayData.item("table", spec.getTable()));
-        builder.addIfNotNull(DisplayData.item("filter", String.valueOf(spec.getFilter())));
+        builder.addIfNotNull(DisplayData.item("partition", String.valueOf(spec.getPartition())));
         builder.add(DisplayData.item("configProperties", spec.getConfigProperties().toString()));
         builder.add(DisplayData.item("batchSize", spec.getBatchSize()));
       }
@@ -465,7 +433,7 @@ public class HCatalogIO {
             new WriteEntity.Builder()
                 .withDatabase(spec.getDatabase())
                 .withTable(spec.getTable())
-                .withPartition(spec.getFilter())
+                .withPartition(spec.getPartition())
                 .build();
         masterWriter = DataTransferFactory.getHCatWriter(entity, spec.getConfigProperties());
         writerContext = masterWriter.prepareWrite();
@@ -506,6 +474,19 @@ public class HCatalogIO {
           hCatRecordsBatch.clear();
         }
       }
+
+      @Teardown
+      public void tearDown() throws Exception {
+        if (slaveWriter != null) {
+          slaveWriter = null;
+        }
+        if (masterWriter != null) {
+          masterWriter = null;
+        }
+        if (writerContext != null) {
+          writerContext = null;
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
index 5792bf6..31e5b1c 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
  * https://github.com/apache/hive/blob/master/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce
  * /HCatBaseTest.java </a>
  */
-public final class EmbeddedMetastoreService implements AutoCloseable {
+final class EmbeddedMetastoreService implements AutoCloseable {
   private final Driver driver;
   private final HiveConf hiveConf;
   private final SessionState sessionState;
@@ -57,7 +57,6 @@ public final class EmbeddedMetastoreService implements AutoCloseable {
     hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, testWarehouseDirPath);
-    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true);
     hiveConf.setVar(
         HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,

http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
index 49c538f..91671a5 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.io.hcatalog;
 
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_DATABASE;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_FILTER;
 import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_RECORDS_COUNT;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE_NAME;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE;
 import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getConfigPropertiesAsMap;
 import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getExpectedRecords;
 import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getHCatRecords;
@@ -69,7 +71,7 @@ import org.junit.runners.model.Statement;
 
 /** Test for HCatalogIO. */
 public class HCatalogIOTest implements Serializable {
-  public static final PipelineOptions OPTIONS = PipelineOptionsFactory.create();
+  private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create();
 
   @ClassRule
   public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
@@ -103,12 +105,12 @@ public class HCatalogIOTest implements Serializable {
   /** Use this annotation to setup complete test data(table populated with records). */
   @Retention(RetentionPolicy.RUNTIME)
   @Target({ElementType.METHOD})
-  @interface NeedsTestData {}
+  private @interface NeedsTestData {}
 
   /** Use this annotation to setup test tables alone(empty tables, no records are populated). */
   @Retention(RetentionPolicy.RUNTIME)
   @Target({ElementType.METHOD})
-  @interface NeedsEmptyTestTables {}
+  private @interface NeedsEmptyTestTables {}
 
   @BeforeClass
   public static void setupEmbeddedMetastoreService () throws IOException {
@@ -117,7 +119,7 @@ public class HCatalogIOTest implements Serializable {
 
   @AfterClass
   public static void shutdownEmbeddedMetastoreService () throws Exception {
-    service.executeQuery("drop table " + TEST_TABLE_NAME);
+    service.executeQuery("drop table " + TEST_TABLE);
     service.close();
   }
 
@@ -130,23 +132,27 @@ public class HCatalogIOTest implements Serializable {
         .apply(
             HCatalogIO.write()
                 .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-                .withTable(TEST_TABLE_NAME));
+                .withDatabase(TEST_DATABASE)
+                .withTable(TEST_TABLE)
+                .withPartition(new java.util.HashMap<String, String>())
+                .withBatchSize(512L));
     defaultPipeline.run();
 
-    PCollection<String> output =
-        readAfterWritePipeline
-            .apply(
-                HCatalogIO.read()
-                    .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-                    .withTable(HCatalogIOTestUtils.TEST_TABLE_NAME))
-            .apply(
-                ParDo.of(
-                    new DoFn<HCatRecord, String>() {
-                      @ProcessElement
-                      public void processElement(ProcessContext c) {
-                        c.output(c.element().get(0).toString());
-                      }
-                    }));
+    PCollection<String> output = readAfterWritePipeline
+        .apply(
+            HCatalogIO.read()
+                .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+                .withDatabase(TEST_DATABASE)
+                .withTable(TEST_TABLE)
+                .withFilter(TEST_FILTER))
+        .apply(
+            ParDo.of(
+                new DoFn<HCatRecord, String>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.output(c.element().get(0).toString());
+                  }
+                }));
     PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT));
     readAfterWritePipeline.run();
   }
@@ -222,7 +228,7 @@ public class HCatalogIOTest implements Serializable {
         HCatalogIO.read()
             .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
             .withContext(context)
-            .withTable(TEST_TABLE_NAME);
+            .withTable(TEST_TABLE);
 
     List<String> records = new ArrayList<>();
     for (int i = 0; i < context.numSplits(); i++) {
@@ -246,7 +252,7 @@ public class HCatalogIOTest implements Serializable {
         HCatalogIO.read()
             .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
             .withContext(context)
-            .withTable(TEST_TABLE_NAME);
+            .withTable(TEST_TABLE);
 
     BoundedHCatalogSource source = new BoundedHCatalogSource(spec);
     List<BoundedSource<HCatRecord>> unSplitSource = source.split(-1, OPTIONS);
@@ -260,8 +266,8 @@ public class HCatalogIOTest implements Serializable {
   }
 
   private void reCreateTestTable() throws CommandNeedRetryException {
-    service.executeQuery("drop table " + TEST_TABLE_NAME);
-    service.executeQuery("create table " + TEST_TABLE_NAME + "(mycol1 string, mycol2 int)");
+    service.executeQuery("drop table " + TEST_TABLE);
+    service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)");
   }
 
   private void prepareTestData() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
index f66e0bc..ae1eb50 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
@@ -35,15 +35,16 @@ import org.apache.hive.hcatalog.data.transfer.WriteEntity;
 import org.apache.hive.hcatalog.data.transfer.WriterContext;
 
 /** Utility class for HCatalogIOTest. */
-public class HCatalogIOTestUtils {
-  public static final String TEST_TABLE_NAME = "mytable";
-
-  public static final int TEST_RECORDS_COUNT = 1000;
+class HCatalogIOTestUtils {
+  static final String TEST_DATABASE = "default";
+  static final String TEST_TABLE = "mytable";
+  static final String TEST_FILTER = "myfilter";
+  static final int TEST_RECORDS_COUNT = 1000;
 
   private static final ReadEntity READ_ENTITY =
-      new ReadEntity.Builder().withTable(TEST_TABLE_NAME).build();
+      new ReadEntity.Builder().withTable(TEST_TABLE).build();
   private static final WriteEntity WRITE_ENTITY =
-      new WriteEntity.Builder().withTable(TEST_TABLE_NAME).build();
+      new WriteEntity.Builder().withTable(TEST_TABLE).build();
 
   /** Returns a ReaderContext instance for the passed datastore config params. */
   static ReaderContext getReaderContext(Map<String, String> config) throws HCatException {
@@ -51,17 +52,18 @@ public class HCatalogIOTestUtils {
   }
 
   /** Returns a WriterContext instance for the passed datastore config params. */
-  static WriterContext getWriterContext(Map<String, String> config) throws HCatException {
+  private static WriterContext getWriterContext(Map<String, String> config) throws HCatException {
     return DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).prepareWrite();
   }
 
   /** Writes records to the table using the passed WriterContext. */
-  static void writeRecords(WriterContext context) throws HCatException {
+  private static void writeRecords(WriterContext context) throws HCatException {
     DataTransferFactory.getHCatWriter(context).write(getHCatRecords(TEST_RECORDS_COUNT).iterator());
   }
 
   /** Commits the pending writes to the database. */
-  static void commitRecords(Map<String, String> config, WriterContext context) throws IOException {
+  private static void commitRecords(Map<String, String> config, WriterContext context)
+      throws IOException {
     DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).commit(context);
   }
 
@@ -100,7 +102,7 @@ public class HCatalogIOTestUtils {
   }
 
   /** returns a DefaultHCatRecord instance for passed value. */
-  static DefaultHCatRecord toHCatRecord(int value) {
+  private static DefaultHCatRecord toHCatRecord(int value) {
     return new DefaultHCatRecord(Arrays.<Object>asList("record " + value, value));
   }
 }


[2/2] beam git commit: This closes #3412

Posted by ie...@apache.org.
This closes #3412


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/28c6fd42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/28c6fd42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/28c6fd42

Branch: refs/heads/master
Commit: 28c6fd42eb95fc44fbc038758c31499a9482514d
Parents: 2d25b68 c11f0ff
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed Jun 21 21:58:21 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed Jun 21 21:58:21 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 113 ++++++++-----------
 .../io/hcatalog/EmbeddedMetastoreService.java   |   3 +-
 .../beam/sdk/io/hcatalog/HCatalogIOTest.java    |  54 +++++----
 .../sdk/io/hcatalog/HCatalogIOTestUtils.java    |  22 ++--
 4 files changed, 90 insertions(+), 102 deletions(-)
----------------------------------------------------------------------