You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/16 06:11:16 UTC

git commit: CRUNCH-92 : Read data from RBDMS - Create DatabaseSourec and IT test in crunch-contrib - Make users of DataBaseSource implement a DBWritable & Writable to handle their own serialization, to avoid invisible errors in serialization.[Upd

Updated Branches:
  refs/heads/master 43c4e83a2 -> 2576896c9


CRUNCH-92 : Read data from RBDMS
 - Create DatabaseSourec and IT test in crunch-contrib
 - Make users of DataBaseSource implement a DBWritable & Writable to
   handle their own serialization, to avoid invisible errors in
   serialization.[Update from Gabriel]


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2576896c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2576896c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2576896c

Branch: refs/heads/master
Commit: 2576896c91a0400af07e5fe32c1edd7adea29b48
Parents: 43c4e83
Author: Rahul Sharma <rs...@apache.org>
Authored: Tue Oct 16 09:32:33 2012 +0530
Committer: Rahul Sharma <rs...@apache.org>
Committed: Tue Oct 16 09:32:33 2012 +0530

----------------------------------------------------------------------
 crunch-contrib/pom.xml                             |    7 +
 .../crunch/contrib/io/jdbc/DataBaseSourceIT.java   |  123 +++++++++++++++
 crunch-contrib/src/it/resources/data.script        |   22 +++
 .../crunch/contrib/io/jdbc/DataBaseSource.java     |  121 ++++++++++++++
 .../crunch/contrib/io/jdbc/package-info.java       |   23 +++
 5 files changed, 296 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-contrib/pom.xml b/crunch-contrib/pom.xml
index c96f219..e98509e 100644
--- a/crunch-contrib/pom.xml
+++ b/crunch-contrib/pom.xml
@@ -53,6 +53,13 @@ under the License.
       <artifactId>hadoop-client</artifactId>
       <scope>provided</scope>
     </dependency>
+    
+     <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>1.3.168</version>
+      <scope>test</scope>
+    </dependency>
   
   </dependencies>
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
new file mode 100644
index 0000000..8fdb22d
--- /dev/null
+++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.io.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.contrib.io.jdbc.DataBaseSource;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.h2.tools.RunScript;
+import org.h2.tools.Server;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class DataBaseSourceIT extends CrunchTestSupport implements Serializable {
+  private transient Server server;
+
+  @Before
+  public void start() throws Exception {
+    File file = tempDir.copyResourceFile("data.script");
+    server = Server.createTcpServer().start();
+    RunScript.execute("jdbc:h2:tcp://localhost/~/test", "sa", "", file.getAbsolutePath(), "utf-8", false);
+  }
+
+  @After
+  public void stop() throws Exception {
+    server.stop();
+  }
+
+  public static class IdentifiableName implements DBWritable, Writable {
+
+    public IntWritable id = new IntWritable();
+    public Text name = new Text();
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id.readFields(in);
+      name.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      id.write(out);
+      name.write(out);
+    }
+
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+      id.set(resultSet.getInt(1));
+      name.set(resultSet.getString(2));
+    }
+
+    @Override
+    public void write(PreparedStatement preparedStatement) throws SQLException {
+      throw new UnsupportedOperationException("Not implemented");
+    }
+
+  }
+
+  @Test
+  public void testReadFromSource() throws Exception {
+    Pipeline pipeline = new MRPipeline(DataBaseSourceIT.class);
+    DataBaseSource<IdentifiableName> dbsrc = new DataBaseSource.Builder<IdentifiableName>(IdentifiableName.class)
+        .setDriverClass(org.h2.Driver.class)
+        .setUrl("jdbc:h2:tcp://localhost/~/test").setUsername("sa").setPassword("")
+        .selectSQLQuery("SELECT ID, NAME FROM TEST").countSQLQuery("select count(*) from Test").build();
+
+    PCollection<IdentifiableName> cdidata = pipeline.read(dbsrc);
+    PCollection<String> names = cdidata.parallelDo(new DoFn<IdentifiableName, String>() {
+
+      @Override
+      public void process(IdentifiableName input, Emitter<String> emitter) {
+        emitter.emit(input.name.toString());
+      }
+
+    }, Writables.strings());
+
+    List<String> nameList = Lists.newArrayList(names.materialize());
+    pipeline.done();
+
+    assertEquals(2, nameList.size());
+    assertEquals(Sets.newHashSet("Hello", "World"), Sets.newHashSet(nameList));
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/it/resources/data.script
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/resources/data.script b/crunch-contrib/src/it/resources/data.script
new file mode 100644
index 0000000..16fc151
--- /dev/null
+++ b/crunch-contrib/src/it/resources/data.script
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+DROP TABLE IF EXISTS TEST;
+CREATE TABLE TEST(ID INT PRIMARY KEY, NAME VARCHAR(255));
+INSERT INTO TEST VALUES(1, 'Hello');
+INSERT INTO TEST VALUES(2, 'World');

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
new file mode 100644
index 0000000..23ca685
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Driver;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * Source from reading from a database via a JDBC connection. Underlying
+ * database reading is provided by {@link DBInputFormat}.
+ * <p>
+ * A type that is input via this class must be a Writable that also implements
+ * DBWritable. On the {@link DBWritable#readFields(java.sql.ResultSet)} method
+ * needs to be fully implemented form {@link DBWritable}.
+ * 
+ * @param <T> The input type of this source
+ */
+public class DataBaseSource<T extends DBWritable & Writable> implements Source<T> {
+
+  private Class<T> inputClass;
+  private PType<T> ptype;
+  private String driverClass;
+  private String url;
+  private String username;
+  private String password;
+  private String selectClause;
+  public String countClause;
+
+  private DataBaseSource(Class<T> inputClass) {
+    this.inputClass = inputClass;
+    this.ptype = Writables.writables(inputClass);
+  }
+
+  static class Builder<T extends DBWritable & Writable> {
+
+    private DataBaseSource<T> dataBaseSource;
+
+    public Builder(Class<T> inputClass) {
+      this.dataBaseSource = new DataBaseSource<T>(inputClass);
+    }
+
+    Builder<T> setDriverClass(Class<? extends Driver> driverClass) {
+      dataBaseSource.driverClass = driverClass.getName();
+      return this;
+    }
+
+    Builder<T> setUrl(String url) {
+      dataBaseSource.url = url;
+      return this;
+    }
+
+    Builder<T> setUsername(String username) {
+      dataBaseSource.username = username;
+      return this;
+    }
+
+    Builder<T> setPassword(String password) {
+      dataBaseSource.password = password;
+      return this;
+    }
+
+    Builder<T> selectSQLQuery(String selectClause) {
+      dataBaseSource.selectClause = selectClause;
+      return this;
+    }
+
+    Builder<T> countSQLQuery(String countClause) {
+      dataBaseSource.countClause = countClause;
+      return this;
+    }
+
+    DataBaseSource<T> build() {
+      return dataBaseSource;
+    }
+  }
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration configuration = job.getConfiguration();
+    DBConfiguration.configureDB(configuration, driverClass, url, username, password);
+    job.setInputFormatClass(DBInputFormat.class);
+    DBInputFormat.setInput(job, inputClass, selectClause, countClause);
+  }
+
+  @Override
+  public long getSize(Configuration configuration) {
+    // TODO Do something smarter here
+    return 1000 * 1000;
+  }
+
+  @Override
+  public PType<T> getType() {
+    return ptype;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java
new file mode 100644
index 0000000..e6cefe9
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** 
+ * Support for reading data from RDBMS using JDBC
+ */
+package org.apache.crunch.contrib.io.jdbc;
+