You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/07/28 02:36:57 UTC

svn commit: r979917 - in /hadoop/pig/trunk: ./ contrib/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ ivy/

Author: hashutosh
Date: Wed Jul 28 00:36:56 2010
New Revision: 979917

URL: http://svn.apache.org/viewvc?rev=979917&view=rev
Log:
PIG-1229: allow pig to write output into a JDBC db

Added:
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java
Modified:
    hadoop/pig/trunk/contrib/CHANGES.txt
    hadoop/pig/trunk/contrib/piggybank/java/build.xml
    hadoop/pig/trunk/ivy.xml
    hadoop/pig/trunk/ivy/libraries.properties

Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=979917&r1=979916&r2=979917&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Wed Jul 28 00:36:56 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1229 allow pig to write output into a JDBC db (ankur via hashutosh)
+
 PIG-1385 UDF to create tuples and bags (hcbusy via gates)
 
 PIG-1331 Add Owl as a contrib project (ajaykidave via gates)

Modified: hadoop/pig/trunk/contrib/piggybank/java/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/build.xml?rev=979917&r1=979916&r2=979917&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/build.xml (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/build.xml Wed Jul 28 00:36:56 2010
@@ -37,6 +37,7 @@
     <property name="pigtest" value="../../../build/test/classes" />
     <property name="udfjar" value="piggybank.jar" />
     <property name="src.dir" value="src/main/java/org/apache/pig/piggybank" />
+   <property name="hsqldb.jar" value="../../../build/ivy/lib/Pig/hsqldb-1.8.0.10.jar"/>
 
     <!-- jar properties -->
     <property name=".javadoc" value="${build.docs}/api" />
@@ -70,6 +71,7 @@
         <pathelement location="${hadoopjar}"/>
         <pathelement location="${pigtest}"/>
         <pathelement location="${hive_execjar}"/>
+    	  <pathelement location="${hsqldb.jar}"/>
     </path>
 
     <path id="test.classpath">

Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java?rev=979917&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java Wed Jul 28 00:36:56 2010
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import java.io.IOException;
+import java.sql.*;
+
+public class DBStorage extends StoreFunc {
+  private final Log log = LogFactory.getLog(getClass());
+
+  private PreparedStatement ps;
+  private Connection con;
+  private String jdbcURL;
+  private String user;
+  private String pass;
+  private int batchSize;
+  private int count = 0;
+  private String insertQuery;
+
+  public DBStorage(String driver, String jdbcURL, String insertQuery) {
+    this(driver, jdbcURL, null, null, insertQuery, "100");
+  }
+
+  public DBStorage(String driver, String jdbcURL, String user, String pass,
+      String insertQuery) throws SQLException {
+    this(driver, jdbcURL, user, pass, insertQuery, "100");
+  }
+
+  public DBStorage(String driver, String jdbcURL, String user, String pass,
+      String insertQuery, String batchSize) throws RuntimeException {
+    log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX,"
+        + insertQuery + ")");
+    try {
+      Class.forName(driver);
+    } catch (ClassNotFoundException e) {
+      log.error("can't load DB driver:" + driver, e);
+      throw new RuntimeException("Can't load DB Driver", e);
+    }
+    this.jdbcURL = jdbcURL;
+    this.user = user;
+    this.pass = pass;
+    this.insertQuery = insertQuery;
+    this.batchSize = Integer.parseInt(batchSize);
+  }
+
+  /**
+   * Write the tuple to Database directly here.
+   */
+  public void putNext(Tuple tuple) throws IOException {
+    int sqlPos = 1;
+    try {
+      int size = tuple.size();
+      for (int i = 0; i < size; i++) {
+        try {
+          Object field = tuple.get(i);
+
+          switch (DataType.findType(field)) {
+          case DataType.NULL:
+            ps.setNull(sqlPos, java.sql.Types.VARCHAR);
+            sqlPos++;
+            break;
+
+          case DataType.BOOLEAN:
+            ps.setBoolean(sqlPos, (Boolean) field);
+            sqlPos++;
+            break;
+
+          case DataType.INTEGER:
+            ps.setInt(sqlPos, (Integer) field);
+            sqlPos++;
+            break;
+
+          case DataType.LONG:
+            ps.setLong(sqlPos, (Long) field);
+            sqlPos++;
+            break;
+
+          case DataType.FLOAT:
+            ps.setFloat(sqlPos, (Float) field);
+            sqlPos++;
+            break;
+
+          case DataType.DOUBLE:
+            ps.setDouble(sqlPos, (Double) field);
+            sqlPos++;
+            break;
+
+          case DataType.BYTEARRAY:
+            byte[] b = ((DataByteArray) field).get();
+            ps.setBytes(sqlPos, b);
+
+            sqlPos++;
+            break;
+          case DataType.CHARARRAY:
+            ps.setString(sqlPos, (String) field);
+            sqlPos++;
+            break;
+          case DataType.BYTE:
+            ps.setByte(sqlPos, (Byte) field);
+            sqlPos++;
+            break;
+
+          case DataType.MAP:
+          case DataType.TUPLE:
+          case DataType.BAG:
+            throw new RuntimeException("Cannot store a non-flat tuple "
+                + "using DbStorage");
+
+          default:
+            throw new RuntimeException("Unknown datatype "
+                + DataType.findType(field));
+
+          }
+
+        } catch (ExecException ee) {
+          throw new RuntimeException(ee);
+        }
+
+      }
+      ps.addBatch();
+      count++;
+      if (count > batchSize) {
+        count = 0;
+        ps.executeBatch();
+        ps.clearBatch();
+        ps.clearParameters();
+      }
+    } catch (SQLException e) {
+      try {
+        log
+            .error("Unable to insert record:" + tuple.toDelimitedString("\t"),
+                e);
+      } catch (ExecException ee) {
+        // do nothing
+      }
+      if (e.getErrorCode() == 1366) {
+        // errors that come due to utf-8 character encoding
+        // ignore these kind of errors TODO: Temporary fix - need to find a
+        // better way of handling them in the argument statement itself
+      } else {
+        throw new RuntimeException("JDBC error", e);
+      }
+    }
+  }
+
+  class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> {
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException,
+        InterruptedException {
+      // IGNORE
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      return new OutputCommitter() {
+
+        @Override
+        public void abortTask(TaskAttemptContext context) throws IOException {
+          try {
+            if (ps != null) {
+              ps.close();
+            }
+            if (con != null) {
+              con.rollback();
+              con.close();
+            }
+          } catch (SQLException sqe) {
+            throw new IOException(sqe);
+          }
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext context) throws IOException {
+          if (ps != null) {
+            try {
+              ps.executeBatch();
+              con.commit();
+              ps.close();
+              con.close();
+              ps = null;
+              con = null;
+            } catch (SQLException e) {
+              log.error("ps.close", e);
+              throw new IOException("JDBC Error", e);
+            }
+          }
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext context)
+            throws IOException {
+          return true;
+        }
+
+        @Override
+        public void cleanupJob(JobContext context) throws IOException {
+          // IGNORE
+        }
+
+        @Override
+        public void setupJob(JobContext context) throws IOException {
+          // IGNORE
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext context) throws IOException {
+          // IGNORE
+        }
+      };
+    }
+
+    @Override
+    public RecordWriter<NullWritable, NullWritable> getRecordWriter(
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      // We don't use a record writer to write to database
+    	return new RecordWriter<NullWritable, NullWritable>() {
+    		   	  @Override
+    		   	  public void close(TaskAttemptContext context) {
+    		   		  // Noop
+    		    	  }
+    		    	  @Override
+    		    	  public void write(NullWritable k, NullWritable v) {
+    		    		  // Noop
+    		    	  }
+    		      };
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public OutputFormat getOutputFormat()
+      throws IOException {
+    return new MyDBOutputFormat();
+  }
+
+  /**
+   * Initialise the database connection and prepared statement here.
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public void prepareToWrite(RecordWriter writer)
+      throws IOException {
+    ps = null;
+    con = null;
+    if (insertQuery == null) {
+      throw new IOException("SQL Insert command not specified");
+    }
+    try {
+      if (user == null || pass == null) {
+        con = DriverManager.getConnection(jdbcURL);
+      } else {
+        con = DriverManager.getConnection(jdbcURL, user, pass);
+      }
+      con.setAutoCommit(false);
+      ps = con.prepareStatement(insertQuery);
+    } catch (SQLException e) {
+      log.error("Unable to connect to JDBC @" + jdbcURL);
+      throw new IOException("JDBC Error", e);
+    }
+    count = 0;
+  }
+
+  @Override
+  public void setStoreLocation(String location, Job job) throws IOException {
+    // IGNORE since we are writing records to DB.
+  }
+}

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java?rev=979917&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java Wed Jul 28 00:36:56 2010
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.Util;
+import org.hsqldb.Server;
+import org.junit.After;
+import org.junit.Before;
+
+import junit.framework.TestCase;
+
+public class TestDBStorage extends TestCase {
+
+	private PigServer pigServer;
+	private MiniCluster cluster;
+	private Server dbServer;
+	private String driver = "org.hsqldb.jdbcDriver";
+	// private String url = "jdbc:hsqldb:mem:.";
+	private String dblocation = "/tmp/batchtest";
+	private String url = "jdbc:hsqldb:file:" + dblocation
+			+ ";hsqldb.default_table_type=cached;hsqldb.cache_rows=100";
+	private String user = "sa";
+	private String password = "";
+
+	private static final String INPUT_FILE = "datafile.txt";
+
+	public TestDBStorage() throws ExecException, IOException {
+		// Initialise Pig server
+		cluster = MiniCluster.buildCluster();
+		pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+		pigServer.getPigContext().getProperties()
+				.setProperty("mapred.map.max.attempts", "1");
+		pigServer.getPigContext().getProperties()
+				.setProperty("mapred.reduce.max.attempts", "1");
+		System.out.println("Pig server initialized successfully");
+		// Initialise DBServer
+		dbServer = new Server();
+		dbServer.setDatabaseName(0, "batchtest");
+		// dbServer.setDatabasePath(0, "mem:test;sql.enforce_strict_size=true");
+		dbServer.setDatabasePath(0,
+				"file:/tmp/batchtest;sql.enforce_strict_size=true");
+		dbServer.setLogWriter(null);
+		dbServer.setErrWriter(null);
+		try {
+			Class.forName(driver);
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.out.println(this + ".setUp() error: " + e.getMessage());
+		}
+		System.out.println("Database server initialized successfully");
+	}
+
+	private void createFile() throws IOException {
+		PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+		w = new PrintWriter(new FileWriter(INPUT_FILE));
+		w.println("100\tapple\t1.0");
+		w.println("100\torange\t2.0");
+		w.println("100\tbanana\t1.1");
+		w.close();
+		Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+	}
+
+	private void createTable() throws IOException {
+		Connection con = null;
+		String sql = "create table ttt (id integer, name varchar(32), ratio double)";
+		try {
+			con = DriverManager.getConnection(url, user, password);
+		} catch (SQLException sqe) {
+			throw new IOException("Unable to obtain a connection to the database",
+					sqe);
+		}
+		try {
+			Statement st = con.createStatement();
+			st.executeUpdate(sql);
+			st.close();
+			con.close();
+		} catch (SQLException sqe) {
+			throw new IOException("Cannot create table", sqe);
+		}
+	}
+
+	@Before
+	public void setUp() throws IOException {
+		createFile();
+		createTable();
+	}
+
+	@After
+	public void tearDown() throws IOException {
+		new File(INPUT_FILE).delete();
+		Util.deleteFile(cluster, INPUT_FILE);
+		pigServer.shutdown();
+		dbServer.stop();
+
+		File[] dbFiles = new File("/tmp").listFiles(new FilenameFilter() {
+			@Override
+			public boolean accept(File dir, String name) {
+				if (name.startsWith("batchtest")) {
+					return true;
+				} else {
+					return false;
+				}
+			}
+		});
+		if (dbFiles != null) {
+			for (File file : dbFiles) {
+				file.delete();
+			}
+		}
+	}
+
+	public void testWriteToDB() throws IOException {
+		String insertQuery = "insert into ttt (id, name, ratio) values (?,?,?)";
+		String dbStore = "org.apache.pig.piggybank.storage.DBStorage('" + driver
+				+ "', '" + url + "','" + insertQuery + "');";
+		pigServer.setBatchOn();
+		pigServer.registerQuery("A = LOAD '" + INPUT_FILE
+				+ "' as (id:int, fruit:chararray, ratio:double);");
+		pigServer.registerQuery("STORE A INTO 'dummy' USING " + dbStore);
+	  ExecJob job = pigServer.executeBatch().get(0);
+		try {
+			while(!job.hasCompleted()) Thread.sleep(1000);
+		} catch(InterruptedException ie) {// ignore
+		}
+	  
+		assertNotSame("Failed: " + job.getException(), job.getStatus(),
+						ExecJob.JOB_STATUS.FAILED);
+		
+		Connection con = null;
+		String selectQuery = "select id, name, ratio from ttt order by name";
+		try {
+			con = DriverManager.getConnection(url, user, password);
+		} catch (SQLException sqe) {
+			throw new IOException(
+					"Unable to obtain database connection for data verification", sqe);
+		}
+		try {
+			PreparedStatement ps = con.prepareStatement(selectQuery);
+			ResultSet rs = ps.executeQuery();
+
+			int expId = 100;
+			String[] expNames = { "apple", "banana", "orange" };
+			double[] expRatios = { 1.0, 1.1, 2.0 };
+			for (int i = 0; i < 3 && rs.next(); i++) {
+				assertEquals("Id mismatch", expId, rs.getInt(1));
+				assertEquals("Name mismatch", expNames[i], rs.getString(2));
+				assertEquals("Ratio mismatch", expRatios[i], rs.getDouble(3), 0.0001);
+			}
+		} catch (SQLException sqe) {
+			throw new IOException(
+					"Unable to read data from database for verification", sqe);
+		}
+	}
+}

Modified: hadoop/pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy.xml?rev=979917&r1=979916&r2=979917&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy.xml (original)
+++ hadoop/pig/trunk/ivy.xml Wed Jul 28 00:36:56 2010
@@ -74,5 +74,7 @@
     <dependency org="joda-time" name="joda-time" rev="${joda-time.version}" conf="compile->master"/>
 	<dependency org="org.python" name="jython" rev="${jython.version}" conf="compile->master"/>
     <!--ATM hbase, hbase-test.jar, hadoop.jar are resolved from the lib dir--> 	
+    <dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}"
+      conf="test->default" />
     </dependencies>
 </ivy-module>

Modified: hadoop/pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/libraries.properties?rev=979917&r1=979916&r2=979917&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy/libraries.properties (original)
+++ hadoop/pig/trunk/ivy/libraries.properties Wed Jul 28 00:36:56 2010
@@ -22,6 +22,8 @@ commons-cli.version=1.0
 commons-logging.version=1.0.3
 checkstyle.version=4.2
 
+hsqldb.version=1.8.0.10
+
 ivy.version=2.0.0-rc2
 
 javacc.version=4.2