You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/06/21 19:58:43 UTC
[1/2] beam git commit: Fix minor issues on HCatalogIO
Repository: beam
Updated Branches:
refs/heads/master 2d25b6840 -> 28c6fd42e
Fix minor issues on HCatalogIO
- Restrict access level when possible
- Rename Filter to Partition for the write to be coherent with the HCatalog API
- Improve test coverage
- Fix documentation details
- Implement TearDown method for the writer
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c11f0ff5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c11f0ff5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c11f0ff5
Branch: refs/heads/master
Commit: c11f0ff57efca5786fb5da20006d9eb96b44cffe
Parents: 2d25b68
Author: Ismaël Mejía <ie...@apache.org>
Authored: Fri Jun 9 00:01:55 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed Jun 21 21:58:14 2017 +0200
----------------------------------------------------------------------
.../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 113 ++++++++-----------
.../io/hcatalog/EmbeddedMetastoreService.java | 3 +-
.../beam/sdk/io/hcatalog/HCatalogIOTest.java | 54 +++++----
.../sdk/io/hcatalog/HCatalogIOTestUtils.java | 22 ++--
4 files changed, 90 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 07b56e3..1549dab 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -78,11 +78,10 @@ import org.slf4j.LoggerFactory;
*
* pipeline
* .apply(HCatalogIO.read()
- * .withConfigProperties(configProperties) //mandatory
- * .withTable("employee") //mandatory
+ * .withConfigProperties(configProperties)
* .withDatabase("default") //optional, assumes default if none specified
- * .withFilter(filterString) //optional,
- * should be specified if the table is partitioned
+ * .withTable("employee")
+ * .withFilter(filterString) //optional, may be specified if the table is partitioned
* }</pre>
*
* <h3>Writing using HCatalog</h3>
@@ -100,13 +99,11 @@ import org.slf4j.LoggerFactory;
* pipeline
* .apply(...)
* .apply(HiveIO.write()
- * .withConfigProperties(configProperties) //mandatory
- * .withTable("employee") //mandatory
+ * .withConfigProperties(configProperties)
* .withDatabase("default") //optional, assumes default if none specified
- * .withFilter(partitionValues) //optional,
- * should be specified if the table is partitioned
- * .withBatchSize(1024L)) //optional,
- * assumes a default batch size of 1024 if none specified
+ * .withTable("employee")
+ * .withPartition(partitionValues) //optional, may be specified if the table is partitioned
+ * .withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified
* }</pre>
*/
@Experimental
@@ -114,14 +111,17 @@ public class HCatalogIO {
private static final Logger LOG = LoggerFactory.getLogger(HCatalogIO.class);
+ private static final long BATCH_SIZE = 1024L;
+ private static final String DEFAULT_DATABASE = "default";
+
/** Write data to Hive. */
public static Write write() {
- return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(1024L).build();
+ return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(BATCH_SIZE).build();
}
/** Read data from Hive. */
public static Read read() {
- return new AutoValue_HCatalogIO_Read.Builder().setDatabase("default").build();
+ return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build();
}
private HCatalogIO() {}
@@ -130,44 +130,26 @@ public class HCatalogIO {
@VisibleForTesting
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> {
- @Nullable
- abstract Map<String, String> getConfigProperties();
-
- @Nullable
- abstract String getDatabase();
-
- @Nullable
- abstract String getTable();
-
- @Nullable
- abstract String getFilter();
-
- @Nullable
- abstract ReaderContext getContext();
-
- @Nullable
- abstract Integer getSplitId();
-
+ @Nullable abstract Map<String, String> getConfigProperties();
+ @Nullable abstract String getDatabase();
+ @Nullable abstract String getTable();
+ @Nullable abstract String getFilter();
+ @Nullable abstract ReaderContext getContext();
+ @Nullable abstract Integer getSplitId();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setConfigProperties(Map<String, String> configProperties);
-
abstract Builder setDatabase(String database);
-
abstract Builder setTable(String table);
-
abstract Builder setFilter(String filter);
-
abstract Builder setSplitId(Integer splitId);
-
abstract Builder setContext(ReaderContext context);
-
abstract Read build();
}
- /** Sets the configuration properties like metastore URI. This is mandatory */
+ /** Sets the configuration properties like metastore URI. */
public Read withConfigProperties(Map<String, String> configProperties) {
return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
}
@@ -177,12 +159,12 @@ public class HCatalogIO {
return toBuilder().setDatabase(database).build();
}
- /** Sets the table name to read from. This is mandatory */
+ /** Sets the table name to read from. */
public Read withTable(String table) {
return toBuilder().setTable(table).build();
}
- /** Sets the filter (partition) details. This is optional, assumes none if not specified */
+ /** Sets the filter details. This is optional, assumes none if not specified */
public Read withFilter(String filter) {
return toBuilder().setFilter(filter).build();
}
@@ -220,7 +202,7 @@ public class HCatalogIO {
/** A HCatalog {@link BoundedSource} reading {@link HCatRecord} from a given instance. */
@VisibleForTesting
static class BoundedHCatalogSource extends BoundedSource<HCatRecord> {
- private Read spec;
+ private final Read spec;
BoundedHCatalogSource(Read spec) {
this.spec = spec;
@@ -367,38 +349,24 @@ public class HCatalogIO {
/** A {@link PTransform} to write to a HCatalog managed source. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<HCatRecord>, PDone> {
- @Nullable
- abstract Map<String, String> getConfigProperties();
-
- @Nullable
- abstract String getDatabase();
-
- @Nullable
- abstract String getTable();
-
- @Nullable
- abstract Map getFilter();
-
+ @Nullable abstract Map<String, String> getConfigProperties();
+ @Nullable abstract String getDatabase();
+ @Nullable abstract String getTable();
+ @Nullable abstract Map<String, String> getPartition();
abstract long getBatchSize();
-
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setConfigProperties(Map<String, String> configProperties);
-
abstract Builder setDatabase(String database);
-
abstract Builder setTable(String table);
-
- abstract Builder setFilter(Map partition);
-
+ abstract Builder setPartition(Map<String, String> partition);
abstract Builder setBatchSize(long batchSize);
-
abstract Write build();
}
- /** Sets the configuration properties like metastore URI. This is mandatory */
+ /** Sets the configuration properties like metastore URI. */
public Write withConfigProperties(Map<String, String> configProperties) {
return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
}
@@ -408,14 +376,14 @@ public class HCatalogIO {
return toBuilder().setDatabase(database).build();
}
- /** Sets the table name to write to, the table should exist beforehand. This is mandatory */
+ /** Sets the table name to write to, the table should exist beforehand. */
public Write withTable(String table) {
return toBuilder().setTable(table).build();
}
- /** Sets the filter (partition) details. This is required if the table is partitioned */
- public Write withFilter(Map filter) {
- return toBuilder().setFilter(filter).build();
+ /** Sets the partition details. */
+ public Write withPartition(Map<String, String> partition) {
+ return toBuilder().setPartition(partition).build();
}
/**
@@ -454,7 +422,7 @@ public class HCatalogIO {
super.populateDisplayData(builder);
builder.addIfNotNull(DisplayData.item("database", spec.getDatabase()));
builder.add(DisplayData.item("table", spec.getTable()));
- builder.addIfNotNull(DisplayData.item("filter", String.valueOf(spec.getFilter())));
+ builder.addIfNotNull(DisplayData.item("partition", String.valueOf(spec.getPartition())));
builder.add(DisplayData.item("configProperties", spec.getConfigProperties().toString()));
builder.add(DisplayData.item("batchSize", spec.getBatchSize()));
}
@@ -465,7 +433,7 @@ public class HCatalogIO {
new WriteEntity.Builder()
.withDatabase(spec.getDatabase())
.withTable(spec.getTable())
- .withPartition(spec.getFilter())
+ .withPartition(spec.getPartition())
.build();
masterWriter = DataTransferFactory.getHCatWriter(entity, spec.getConfigProperties());
writerContext = masterWriter.prepareWrite();
@@ -506,6 +474,19 @@ public class HCatalogIO {
hCatRecordsBatch.clear();
}
}
+
+ @Teardown
+ public void tearDown() throws Exception {
+ if (slaveWriter != null) {
+ slaveWriter = null;
+ }
+ if (masterWriter != null) {
+ masterWriter = null;
+ }
+ if (writerContext != null) {
+ writerContext = null;
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
index 5792bf6..31e5b1c 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
* https://github.com/apache/hive/blob/master/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce
* /HCatBaseTest.java </a>
*/
-public final class EmbeddedMetastoreService implements AutoCloseable {
+final class EmbeddedMetastoreService implements AutoCloseable {
private final Driver driver;
private final HiveConf hiveConf;
private final SessionState sessionState;
@@ -57,7 +57,6 @@ public final class EmbeddedMetastoreService implements AutoCloseable {
hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, testWarehouseDirPath);
- hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true);
hiveConf.setVar(
HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
index 49c538f..91671a5 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.io.hcatalog;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_DATABASE;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_FILTER;
import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_RECORDS_COUNT;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE_NAME;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE;
import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getConfigPropertiesAsMap;
import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getExpectedRecords;
import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getHCatRecords;
@@ -69,7 +71,7 @@ import org.junit.runners.model.Statement;
/** Test for HCatalogIO. */
public class HCatalogIOTest implements Serializable {
- public static final PipelineOptions OPTIONS = PipelineOptionsFactory.create();
+ private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create();
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
@@ -103,12 +105,12 @@ public class HCatalogIOTest implements Serializable {
/** Use this annotation to setup complete test data(table populated with records). */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
- @interface NeedsTestData {}
+ private @interface NeedsTestData {}
/** Use this annotation to setup test tables alone(empty tables, no records are populated). */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
- @interface NeedsEmptyTestTables {}
+ private @interface NeedsEmptyTestTables {}
@BeforeClass
public static void setupEmbeddedMetastoreService () throws IOException {
@@ -117,7 +119,7 @@ public class HCatalogIOTest implements Serializable {
@AfterClass
public static void shutdownEmbeddedMetastoreService () throws Exception {
- service.executeQuery("drop table " + TEST_TABLE_NAME);
+ service.executeQuery("drop table " + TEST_TABLE);
service.close();
}
@@ -130,23 +132,27 @@ public class HCatalogIOTest implements Serializable {
.apply(
HCatalogIO.write()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
- .withTable(TEST_TABLE_NAME));
+ .withDatabase(TEST_DATABASE)
+ .withTable(TEST_TABLE)
+ .withPartition(new java.util.HashMap<String, String>())
+ .withBatchSize(512L));
defaultPipeline.run();
- PCollection<String> output =
- readAfterWritePipeline
- .apply(
- HCatalogIO.read()
- .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
- .withTable(HCatalogIOTestUtils.TEST_TABLE_NAME))
- .apply(
- ParDo.of(
- new DoFn<HCatRecord, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().get(0).toString());
- }
- }));
+ PCollection<String> output = readAfterWritePipeline
+ .apply(
+ HCatalogIO.read()
+ .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+ .withDatabase(TEST_DATABASE)
+ .withTable(TEST_TABLE)
+ .withFilter(TEST_FILTER))
+ .apply(
+ ParDo.of(
+ new DoFn<HCatRecord, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().get(0).toString());
+ }
+ }));
PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT));
readAfterWritePipeline.run();
}
@@ -222,7 +228,7 @@ public class HCatalogIOTest implements Serializable {
HCatalogIO.read()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withContext(context)
- .withTable(TEST_TABLE_NAME);
+ .withTable(TEST_TABLE);
List<String> records = new ArrayList<>();
for (int i = 0; i < context.numSplits(); i++) {
@@ -246,7 +252,7 @@ public class HCatalogIOTest implements Serializable {
HCatalogIO.read()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withContext(context)
- .withTable(TEST_TABLE_NAME);
+ .withTable(TEST_TABLE);
BoundedHCatalogSource source = new BoundedHCatalogSource(spec);
List<BoundedSource<HCatRecord>> unSplitSource = source.split(-1, OPTIONS);
@@ -260,8 +266,8 @@ public class HCatalogIOTest implements Serializable {
}
private void reCreateTestTable() throws CommandNeedRetryException {
- service.executeQuery("drop table " + TEST_TABLE_NAME);
- service.executeQuery("create table " + TEST_TABLE_NAME + "(mycol1 string, mycol2 int)");
+ service.executeQuery("drop table " + TEST_TABLE);
+ service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)");
}
private void prepareTestData() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
index f66e0bc..ae1eb50 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
@@ -35,15 +35,16 @@ import org.apache.hive.hcatalog.data.transfer.WriteEntity;
import org.apache.hive.hcatalog.data.transfer.WriterContext;
/** Utility class for HCatalogIOTest. */
-public class HCatalogIOTestUtils {
- public static final String TEST_TABLE_NAME = "mytable";
-
- public static final int TEST_RECORDS_COUNT = 1000;
+class HCatalogIOTestUtils {
+ static final String TEST_DATABASE = "default";
+ static final String TEST_TABLE = "mytable";
+ static final String TEST_FILTER = "myfilter";
+ static final int TEST_RECORDS_COUNT = 1000;
private static final ReadEntity READ_ENTITY =
- new ReadEntity.Builder().withTable(TEST_TABLE_NAME).build();
+ new ReadEntity.Builder().withTable(TEST_TABLE).build();
private static final WriteEntity WRITE_ENTITY =
- new WriteEntity.Builder().withTable(TEST_TABLE_NAME).build();
+ new WriteEntity.Builder().withTable(TEST_TABLE).build();
/** Returns a ReaderContext instance for the passed datastore config params. */
static ReaderContext getReaderContext(Map<String, String> config) throws HCatException {
@@ -51,17 +52,18 @@ public class HCatalogIOTestUtils {
}
/** Returns a WriterContext instance for the passed datastore config params. */
- static WriterContext getWriterContext(Map<String, String> config) throws HCatException {
+ private static WriterContext getWriterContext(Map<String, String> config) throws HCatException {
return DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).prepareWrite();
}
/** Writes records to the table using the passed WriterContext. */
- static void writeRecords(WriterContext context) throws HCatException {
+ private static void writeRecords(WriterContext context) throws HCatException {
DataTransferFactory.getHCatWriter(context).write(getHCatRecords(TEST_RECORDS_COUNT).iterator());
}
/** Commits the pending writes to the database. */
- static void commitRecords(Map<String, String> config, WriterContext context) throws IOException {
+ private static void commitRecords(Map<String, String> config, WriterContext context)
+ throws IOException {
DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).commit(context);
}
@@ -100,7 +102,7 @@ public class HCatalogIOTestUtils {
}
/** returns a DefaultHCatRecord instance for passed value. */
- static DefaultHCatRecord toHCatRecord(int value) {
+ private static DefaultHCatRecord toHCatRecord(int value) {
return new DefaultHCatRecord(Arrays.<Object>asList("record " + value, value));
}
}
[2/2] beam git commit: This closes #3412
Posted by ie...@apache.org.
This closes #3412
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/28c6fd42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/28c6fd42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/28c6fd42
Branch: refs/heads/master
Commit: 28c6fd42eb95fc44fbc038758c31499a9482514d
Parents: 2d25b68 c11f0ff
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed Jun 21 21:58:21 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed Jun 21 21:58:21 2017 +0200
----------------------------------------------------------------------
.../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 113 ++++++++-----------
.../io/hcatalog/EmbeddedMetastoreService.java | 3 +-
.../beam/sdk/io/hcatalog/HCatalogIOTest.java | 54 +++++----
.../sdk/io/hcatalog/HCatalogIOTestUtils.java | 22 ++--
4 files changed, 90 insertions(+), 102 deletions(-)
----------------------------------------------------------------------