You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/16 06:11:16 UTC
git commit: CRUNCH-92 : Read data from RBDMS - Create DatabaseSourec
and IT test in crunch-contrib - Make users of DataBaseSource implement a
DBWritable & Writable to handle their own serialization,
to avoid invisible errors in serialization.[Upd
Updated Branches:
refs/heads/master 43c4e83a2 -> 2576896c9
CRUNCH-92 : Read data from RBDMS
- Create DatabaseSourec and IT test in crunch-contrib
- Make users of DataBaseSource implement a DBWritable & Writable to
handle their own serialization, to avoid invisible errors in
serialization.[Update from Gabriel]
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2576896c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2576896c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2576896c
Branch: refs/heads/master
Commit: 2576896c91a0400af07e5fe32c1edd7adea29b48
Parents: 43c4e83
Author: Rahul Sharma <rs...@apache.org>
Authored: Tue Oct 16 09:32:33 2012 +0530
Committer: Rahul Sharma <rs...@apache.org>
Committed: Tue Oct 16 09:32:33 2012 +0530
----------------------------------------------------------------------
crunch-contrib/pom.xml | 7 +
.../crunch/contrib/io/jdbc/DataBaseSourceIT.java | 123 +++++++++++++++
crunch-contrib/src/it/resources/data.script | 22 +++
.../crunch/contrib/io/jdbc/DataBaseSource.java | 121 ++++++++++++++
.../crunch/contrib/io/jdbc/package-info.java | 23 +++
5 files changed, 296 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-contrib/pom.xml b/crunch-contrib/pom.xml
index c96f219..e98509e 100644
--- a/crunch-contrib/pom.xml
+++ b/crunch-contrib/pom.xml
@@ -53,6 +53,13 @@ under the License.
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>1.3.168</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
new file mode 100644
index 0000000..8fdb22d
--- /dev/null
+++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.io.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.contrib.io.jdbc.DataBaseSource;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.h2.tools.RunScript;
+import org.h2.tools.Server;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class DataBaseSourceIT extends CrunchTestSupport implements Serializable {
+ private transient Server server;
+
+ @Before
+ public void start() throws Exception {
+ File file = tempDir.copyResourceFile("data.script");
+ server = Server.createTcpServer().start();
+ RunScript.execute("jdbc:h2:tcp://localhost/~/test", "sa", "", file.getAbsolutePath(), "utf-8", false);
+ }
+
+ @After
+ public void stop() throws Exception {
+ server.stop();
+ }
+
+ public static class IdentifiableName implements DBWritable, Writable {
+
+ public IntWritable id = new IntWritable();
+ public Text name = new Text();
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id.readFields(in);
+ name.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ id.write(out);
+ name.write(out);
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ id.set(resultSet.getInt(1));
+ name.set(resultSet.getString(2));
+ }
+
+ @Override
+ public void write(PreparedStatement preparedStatement) throws SQLException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ }
+
+ @Test
+ public void testReadFromSource() throws Exception {
+ Pipeline pipeline = new MRPipeline(DataBaseSourceIT.class);
+ DataBaseSource<IdentifiableName> dbsrc = new DataBaseSource.Builder<IdentifiableName>(IdentifiableName.class)
+ .setDriverClass(org.h2.Driver.class)
+ .setUrl("jdbc:h2:tcp://localhost/~/test").setUsername("sa").setPassword("")
+ .selectSQLQuery("SELECT ID, NAME FROM TEST").countSQLQuery("select count(*) from Test").build();
+
+ PCollection<IdentifiableName> cdidata = pipeline.read(dbsrc);
+ PCollection<String> names = cdidata.parallelDo(new DoFn<IdentifiableName, String>() {
+
+ @Override
+ public void process(IdentifiableName input, Emitter<String> emitter) {
+ emitter.emit(input.name.toString());
+ }
+
+ }, Writables.strings());
+
+ List<String> nameList = Lists.newArrayList(names.materialize());
+ pipeline.done();
+
+ assertEquals(2, nameList.size());
+ assertEquals(Sets.newHashSet("Hello", "World"), Sets.newHashSet(nameList));
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/it/resources/data.script
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/resources/data.script b/crunch-contrib/src/it/resources/data.script
new file mode 100644
index 0000000..16fc151
--- /dev/null
+++ b/crunch-contrib/src/it/resources/data.script
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+DROP TABLE IF EXISTS TEST;
+CREATE TABLE TEST(ID INT PRIMARY KEY, NAME VARCHAR(255));
+INSERT INTO TEST VALUES(1, 'Hello');
+INSERT INTO TEST VALUES(2, 'World');
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
new file mode 100644
index 0000000..23ca685
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Driver;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * Source from reading from a database via a JDBC connection. Underlying
+ * database reading is provided by {@link DBInputFormat}.
+ * <p>
+ * A type that is input via this class must be a Writable that also implements
+ * DBWritable. On the {@link DBWritable#readFields(java.sql.ResultSet)} method
+ * needs to be fully implemented form {@link DBWritable}.
+ *
+ * @param <T> The input type of this source
+ */
+public class DataBaseSource<T extends DBWritable & Writable> implements Source<T> {
+
+ private Class<T> inputClass;
+ private PType<T> ptype;
+ private String driverClass;
+ private String url;
+ private String username;
+ private String password;
+ private String selectClause;
+ public String countClause;
+
+ private DataBaseSource(Class<T> inputClass) {
+ this.inputClass = inputClass;
+ this.ptype = Writables.writables(inputClass);
+ }
+
+ static class Builder<T extends DBWritable & Writable> {
+
+ private DataBaseSource<T> dataBaseSource;
+
+ public Builder(Class<T> inputClass) {
+ this.dataBaseSource = new DataBaseSource<T>(inputClass);
+ }
+
+ Builder<T> setDriverClass(Class<? extends Driver> driverClass) {
+ dataBaseSource.driverClass = driverClass.getName();
+ return this;
+ }
+
+ Builder<T> setUrl(String url) {
+ dataBaseSource.url = url;
+ return this;
+ }
+
+ Builder<T> setUsername(String username) {
+ dataBaseSource.username = username;
+ return this;
+ }
+
+ Builder<T> setPassword(String password) {
+ dataBaseSource.password = password;
+ return this;
+ }
+
+ Builder<T> selectSQLQuery(String selectClause) {
+ dataBaseSource.selectClause = selectClause;
+ return this;
+ }
+
+ Builder<T> countSQLQuery(String countClause) {
+ dataBaseSource.countClause = countClause;
+ return this;
+ }
+
+ DataBaseSource<T> build() {
+ return dataBaseSource;
+ }
+ }
+
+ @Override
+ public void configureSource(Job job, int inputId) throws IOException {
+ Configuration configuration = job.getConfiguration();
+ DBConfiguration.configureDB(configuration, driverClass, url, username, password);
+ job.setInputFormatClass(DBInputFormat.class);
+ DBInputFormat.setInput(job, inputClass, selectClause, countClause);
+ }
+
+ @Override
+ public long getSize(Configuration configuration) {
+ // TODO Do something smarter here
+ return 1000 * 1000;
+ }
+
+ @Override
+ public PType<T> getType() {
+ return ptype;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java
new file mode 100644
index 0000000..e6cefe9
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for reading data from RDBMS using JDBC
+ */
+package org.apache.crunch.contrib.io.jdbc;
+