You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/11 15:16:01 UTC
[1/2] beam git commit: Move HashingFn to io/common,
switch to better hash
Repository: beam
Updated Branches:
refs/heads/master 84a96297c -> c46b256d7
Move HashingFn to io/common, switch to better hash
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b615013b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b615013b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b615013b
Branch: refs/heads/master
Commit: b615013b9c941038d3e9fd96a153f0894f52f183
Parents: 84a9629
Author: Stephen Sisk <si...@google.com>
Authored: Fri Apr 7 12:59:28 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 08:15:49 2017 -0700
----------------------------------------------------------------------
sdks/java/io/common/pom.xml | 4 +
.../apache/beam/sdk/io/common/HashingFn.java | 109 +++++++++++++++++++
sdks/java/io/hadoop/jdk1.8-tests/pom.xml | 46 +-------
.../inputformat/HIFIOWithElasticTest.java | 6 +-
.../hadoop/inputformat/hashing/HashingFn.java | 109 -------------------
.../integration/tests/HIFIOCassandraIT.java | 2 +-
.../integration/tests/HIFIOElasticIT.java | 2 +-
7 files changed, 124 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index fa51b47..3f6d79d 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -34,5 +34,9 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
new file mode 100644
index 0000000..d534c87
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a
+ * HashCode.
+ */
+public class HashingFn extends CombineFn<String, HashingFn.Accum, String> {
+
+ /**
+ * Serializable Class to store the HashCode of input String.
+ */
+ public static class Accum implements Serializable {
+ HashCode hashCode = null;
+
+ public Accum(HashCode value) {
+ this.hashCode = value;
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+ }
+
+ @Override
+ public Accum addInput(Accum accum, String input) {
+ List<HashCode> elementHashes = Lists.newArrayList();
+ if (accum.hashCode != null) {
+ elementHashes.add(accum.hashCode);
+ }
+ HashCode inputHashCode = Hashing.murmur3_128().hashString(input, StandardCharsets.UTF_8);
+ elementHashes.add(inputHashCode);
+ accum.hashCode = Hashing.combineUnordered(elementHashes);
+ return accum;
+ }
+
+ @Override
+ public Accum mergeAccumulators(Iterable<Accum> accums) {
+ Accum merged = createAccumulator();
+ List<HashCode> elementHashes = Lists.newArrayList();
+ for (Accum accum : accums) {
+ if (accum.hashCode != null) {
+ elementHashes.add(accum.hashCode);
+ }
+ }
+ merged.hashCode = Hashing.combineUnordered(elementHashes);
+ return merged;
+ }
+
+ @Override
+ public String extractOutput(Accum accum) {
+ // Return the combined hash code of list of elements in the Pcollection.
+ String consolidatedHash = "";
+ if (accum.hashCode != null) {
+ consolidatedHash = accum.hashCode.toString();
+ }
+ return consolidatedHash;
+ }
+
+ @Override
+ public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String> inputCoder)
+ throws CannotProvideCoderException {
+ return SerializableCoder.of(Accum.class);
+ }
+
+ @Override
+ public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String> inputCoder) {
+ return inputCoder;
+ }
+
+ @Override
+ public Accum createAccumulator() {
+ return new Accum(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
index 4c510ae..84b923a 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
+++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
@@ -36,41 +36,6 @@
<build>
<plugins>
- <plugin>
- <!-- Guava shading is required as Cassandra tests require version
- 19 of Guava, by default project wide Guava shading may not suffice as it
- loads a different version of guava which will not work for Cassandra tests -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <includes>
- <include>com.google.guava:guava:19.0</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google.common</pattern>
- <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.common</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.thirdparty</pattern>
- <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.thirdparty</shadedPattern>
- </relocation>
- </relocations>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
<!-- Overridden enforcer plugin for JDK1.8 for running tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -178,11 +143,6 @@
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -208,6 +168,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.6.2</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
index 599a4a1..51cbd5a 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
@@ -25,7 +25,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
+import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -105,7 +105,7 @@ public class HIFIOWithElasticTest implements Serializable {
@Test
public void testHifIOWithElastic() {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
- String expectedHashCode = "e2098f431f90193aa4545e033e6fd2217aafe7b6";
+ String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
Configuration conf = getConfiguration();
PCollection<KV<Text, LinkedMapWritable>> esData =
pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
@@ -135,7 +135,7 @@ public class HIFIOWithElasticTest implements Serializable {
@Test
public void testHifIOWithElasticQuery() {
long expectedRowCount = 1L;
- String expectedHashCode = "caa37dbd8258e3a7f98932958c819a57aab044ec";
+ String expectedHashCode = "cfbf3e5c993d44e57535a114e25f782d";
Configuration conf = getConfiguration();
String fieldValue = ELASTIC_TYPE_ID_PREFIX + "2";
String query = "{"
http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
deleted file mode 100644
index fe37048..0000000
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.beam.sdk.io.hadoop.inputformat.hashing;
-
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a
- * HashCode.
- */
-public class HashingFn extends CombineFn<String, HashingFn.Accum, String> {
-
- /**
- * Serializable Class to store the HashCode of input String.
- */
- public static class Accum implements Serializable {
- HashCode hashCode = null;
-
- public Accum(HashCode value) {
- this.hashCode = value;
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- }
- }
-
- @Override
- public Accum addInput(Accum accum, String input) {
- List<HashCode> elementHashes = Lists.newArrayList();
- if (accum.hashCode != null) {
- elementHashes.add(accum.hashCode);
- }
- HashCode inputHashCode = Hashing.sha1().hashString(input, StandardCharsets.UTF_8);
- elementHashes.add(inputHashCode);
- accum.hashCode = Hashing.combineUnordered(elementHashes);
- return accum;
- }
-
- @Override
- public Accum mergeAccumulators(Iterable<Accum> accums) {
- Accum merged = createAccumulator();
- List<HashCode> elementHashes = Lists.newArrayList();
- for (Accum accum : accums) {
- if (accum.hashCode != null) {
- elementHashes.add(accum.hashCode);
- }
- }
- merged.hashCode = Hashing.combineUnordered(elementHashes);
- return merged;
- }
-
- @Override
- public String extractOutput(Accum accum) {
- // Return the combined hash code of list of elements in the Pcollection.
- String consolidatedHash = "";
- if (accum.hashCode != null) {
- consolidatedHash = accum.hashCode.toString();
- }
- return consolidatedHash;
- }
-
- @Override
- public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String> inputCoder)
- throws CannotProvideCoderException {
- return SerializableCoder.of(Accum.class);
- }
-
- @Override
- public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String> inputCoder) {
- return inputCoder;
- }
-
- @Override
- public Accum createAccumulator() {
- return new Accum(null);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
index bf9a5fd..bf4cb92 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
@@ -21,9 +21,9 @@ import com.datastax.driver.core.Row;
import java.io.Serializable;
+import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
import org.apache.beam.sdk.io.hadoop.inputformat.custom.options.HIFTestOptions;
-import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
index 13c0cbc..65ef8f2 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
@@ -17,9 +17,9 @@ package org.apache.beam.sdk.io.hadoop.inputformat.integration.tests;
import java.io.IOException;
import java.io.Serializable;
+import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
import org.apache.beam.sdk.io.hadoop.inputformat.custom.options.HIFTestOptions;
-import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
[2/2] beam git commit: This closes #2463
Posted by dh...@apache.org.
This closes #2463
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c46b256d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c46b256d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c46b256d
Branch: refs/heads/master
Commit: c46b256d7dbde1cfde311994faed31e78f143957
Parents: 84a9629 b615013
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 11 08:15:51 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 08:15:51 2017 -0700
----------------------------------------------------------------------
sdks/java/io/common/pom.xml | 4 +
.../apache/beam/sdk/io/common/HashingFn.java | 109 +++++++++++++++++++
sdks/java/io/hadoop/jdk1.8-tests/pom.xml | 46 +-------
.../inputformat/HIFIOWithElasticTest.java | 6 +-
.../hadoop/inputformat/hashing/HashingFn.java | 109 -------------------
.../integration/tests/HIFIOCassandraIT.java | 2 +-
.../integration/tests/HIFIOElasticIT.java | 2 +-
7 files changed, 124 insertions(+), 154 deletions(-)
----------------------------------------------------------------------