You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/10/07 23:14:21 UTC

svn commit: r1180275 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/

Author: khorgath
Date: Fri Oct  7 23:14:21 2011
New Revision: 1180275

URL: http://svn.apache.org/viewvc?rev=1180275&view=rev
Log:
HCATALOG-73 Output Storage Driver for HBase (Direct PUTs) (toffer via khorgath)

Added:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/build.xml

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1180275&r1=1180274&r2=1180275&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Oct  7 23:14:21 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-73. Output Storage Driver for HBase (Direct PUTs) (toffer via khorgath)
+
   HCAT-74. ResultConverter for HBase Storage Drivers (avandana via khorgath)
 
   HCAT-89. Support for creating non-native tables (avandana via gates)

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/build.xml?rev=1180275&r1=1180274&r2=1180275&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/build.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/build.xml Fri Oct  7 23:14:21 2011
@@ -70,6 +70,7 @@
     <property name="test.all.file" value="${test.src.dir}/all-tests"/>
     <property name="test.exclude.file" value="${test.src.dir}/excluded-tests"/>
     <property name="test.output" value="no"/>
+    <property name="test.warehouse.dir" value="/tmp/hcat_junit_warehouse"/>
     <property name="hive.conf.dir" value="${hive.root}/conf"/>
 
     <!-- ivy properteis set here -->
@@ -241,9 +242,12 @@
 
             <delete dir="${test.log.dir}"/>
             <mkdir dir="${test.log.dir}"/>
+            <delete dir="${test.warehouse.dir}"/>
+            <mkdir dir="${test.warehouse.dir}"/>
             <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
                    fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}"
                    errorProperty="tests.failed" failureProperty="tests.failed">
+                <sysproperty key="hive.metastore.warehouse.dir" value="${test.warehouse.dir}"/>
                 <classpath>
                     <pathelement location="${test.build.classes}" />
                     <pathelement location="." />

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Fri Oct  7 23:14:21 2011
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+import java.io.IOException;
+
+/**
+ * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put API to write each row to HBase one a
+ * time. Presently it is just using TableOutputFormat as the underlying implementation in the future we can
+ * tune this to make the writes faster such as permanently disabling WAL, caching, etc.
+ */
+class HBaseDirectOutputFormat extends OutputFormat<WritableComparable<?>,Writable> implements Configurable {
+
+    private TableOutputFormat<WritableComparable<?>> outputFormat;
+
+    public HBaseDirectOutputFormat() {
+        this.outputFormat = new TableOutputFormat<WritableComparable<?>>();
+    }
+
+    @Override
+    public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return outputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        outputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return outputFormat.getOutputCommitter(context);
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        String tableName = conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY);
+        conf = new Configuration(conf);
+        conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);
+        outputFormat.setConf(conf);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return outputFormat.getConf();
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java Fri Oct  7 23:14:21 2011
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * HBase Storage driver implementation which uses "direct" writes to hbase for writing out records.
+ */
+public class HBaseDirectOutputStorageDriver extends HCatOutputStorageDriver {
+    private HCatTableInfo tableInfo;
+    private HBaseDirectOutputFormat outputFormat;
+    private ResultConverter converter;
+    private OutputJobInfo outputJobInfo;
+    private HCatSchema schema;
+    private HCatSchema outputSchema;
+
+    @Override
+    public void initialize(JobContext context, Properties hcatProperties) throws IOException {
+        super.initialize(context, hcatProperties);
+        String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        if( jobString == null ) {
+            throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?");
+        }
+        outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+        tableInfo = outputJobInfo.getTableInfo();
+        schema = tableInfo.getDataColumns();
+
+        List<FieldSchema> fields = HCatUtil.getFieldSchemaList(outputSchema.getFields());
+        hcatProperties.setProperty(Constants.LIST_COLUMNS,
+                MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+        hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
+                MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+
+        //override table properties with user defined ones
+        //in the future we should be more selective on what to override
+        hcatProperties.putAll(outputJobInfo.getProperties());
+        //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called
+        converter = new HBaseSerDeResultConverter(schema,
+                outputSchema,
+                hcatProperties);
+        context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName());
+        outputFormat = new HBaseDirectOutputFormat();
+        outputFormat.setConf(context.getConfiguration());
+    }
+
+    @Override
+    public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+        return outputFormat;
+    }
+
+    @Override
+    public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException {
+        this.outputSchema = schema;
+    }
+
+    @Override
+    public WritableComparable<?> generateKey(HCatRecord value) throws IOException {
+        //HBase doesn't use KEY as part of output
+        return null;
+    }
+
+    @Override
+    public Writable convertValue(HCatRecord value) throws IOException {
+        return converter.convert(value);
+    }
+
+    @Override
+    public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException {
+        //no partitions for this driver
+    }
+
+    @Override
+    public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException {
+        return null;
+    }
+
+    @Override
+    public void setOutputPath(JobContext jobContext, String location) throws IOException {
+        //no output path
+    }
+
+    @Override
+    public String getOutputLocation(JobContext jobContext, String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
+        return null;
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Fri Oct  7 23:14:21 2011
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import com.sun.java.util.jar.pack.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Map;
+
+/**
+ * MiniCluster class composed of a number of Hadoop Minicluster implementations
+ * and other necessary daemons needed for testing (HBase, Hive MetaStore, Zookeeper, MiniMRCluster)
+ */
+public class ManyMiniCluster {
+
+    //MR stuff
+    private boolean miniMRClusterEnabled;
+    private MiniMRCluster mrCluster;
+    private int numTaskTrackers;
+    private JobConf jobConf;
+
+    //HBase stuff
+    private boolean miniHBaseClusterEnabled;
+    private MiniHBaseCluster hbaseCluster;
+    private String hbaseRoot;
+    private Configuration hbaseConf;
+    private String hbaseDir;
+
+    //ZK Stuff
+    private boolean miniZookeeperClusterEnabled;
+    private MiniZooKeeperCluster zookeeperCluster;
+    private int zookeeperPort;
+    private String zookeeperDir;
+
+    //DFS Stuff
+    private MiniDFSCluster dfsCluster;
+
+    //Hive Stuff
+    private boolean miniHiveMetastoreEnabled;
+    private HiveConf hiveConf;
+    private HiveMetaStoreClient hiveMetaStoreClient;
+
+    private final File workDir;
+    private boolean started = false;
+
+
+    /**
+     * create a cluster instance using a builder which will expose configurable options
+     * @param workDir working directory ManyMiniCluster will use for all of it's *Minicluster instances
+     * @return a Builder instance
+     */
+    public static Builder create(File workDir) {
+        return new Builder(workDir);
+    }
+
+    private ManyMiniCluster(Builder b) {
+        workDir = b.workDir;
+        numTaskTrackers = b.numTaskTrackers;
+        hiveConf = b.hiveConf;
+        jobConf = b.jobConf;
+        hbaseConf = b.hbaseConf;
+        miniMRClusterEnabled = b.miniMRClusterEnabled;
+        miniHBaseClusterEnabled = b.miniHBaseClusterEnabled;
+        miniHiveMetastoreEnabled = b.miniHiveMetastoreEnabled;
+        miniZookeeperClusterEnabled = b.miniZookeeperClusterEnabled;
+    }
+
+    protected synchronized  void start() {
+        try {
+            if (!started) {
+                FileUtil.fullyDelete(workDir);
+                if(miniMRClusterEnabled) {
+                    setupMRCluster();
+                }
+                if(miniZookeeperClusterEnabled || miniHBaseClusterEnabled) {
+                    miniZookeeperClusterEnabled = true;
+                    setupZookeeper();
+                }
+                if(miniHBaseClusterEnabled) {
+                    setupHBaseCluster();
+                }
+                if(miniHiveMetastoreEnabled) {
+                    setUpMetastore();
+                }
+            }
+        } catch(Exception e) {
+            throw new IllegalStateException("Failed to setup cluster",e);
+        }
+    }
+
+    protected synchronized  void stop() {
+        if (hbaseCluster != null) {
+            HConnectionManager.deleteAllConnections(true);
+            try {
+                hbaseCluster.shutdown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            hbaseCluster = null;
+        }
+        if (zookeeperCluster != null) {
+            try {
+                zookeeperCluster.shutdown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            zookeeperCluster = null;
+        }
+        if(mrCluster != null) {
+            try {
+                mrCluster.shutdown();
+            } catch(Exception e) {
+                e.printStackTrace();
+            }
+            mrCluster = null;
+        }
+        if(dfsCluster != null) {
+            try {
+                dfsCluster.getFileSystem().close();
+                dfsCluster.shutdown();
+            } catch(Exception e) {
+                e.printStackTrace();
+            }
+            dfsCluster = null;
+        }
+        try {
+            FileSystem.closeAll();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        started = false;
+    }
+
+    /**
+     * @return Configuration of mini HBase cluster
+     */
+    public Configuration getHBaseConf() {
+        return HBaseConfiguration.create(hbaseConf);
+    }
+
+    /**
+     * @return Configuration of mini MR cluster
+     */
+    public Configuration getJobConf() {
+        return new Configuration(jobConf);
+    }
+
+    /**
+     * @return Configuration of Hive Metastore, this is a standalone not a daemon
+     */
+    public HiveConf getHiveConf() {
+        return new HiveConf(hiveConf);
+    }
+
+    /**
+     * @return Filesystem used by MiniMRCluster and MiniHBaseCluster
+     */
+    public FileSystem getFileSystem() {
+        try {
+            return FileSystem.get(jobConf);
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to get FileSystem",e);
+        }
+    }
+
+    /**
+     * @return Metastore client instance
+     */
+    public HiveMetaStoreClient getHiveMetaStoreClient() {
+        return hiveMetaStoreClient;
+    }
+
+    private void setupMRCluster() {
+        try {
+            final int jobTrackerPort = findFreePort();
+            final int taskTrackerPort = findFreePort();
+
+            if(jobConf == null)
+                jobConf = new JobConf();
+
+            jobConf.setInt("mapred.submit.replication", 1);
+            //conf.set("hadoop.job.history.location",new File(workDir).getAbsolutePath()+"/history");
+            System.setProperty("hadoop.log.dir",new File(workDir,"/logs").getAbsolutePath());
+
+            mrCluster = new MiniMRCluster(jobTrackerPort,
+                                          taskTrackerPort,
+                                          numTaskTrackers,
+                                          getFileSystem().getUri().toString(),
+                                          numTaskTrackers,
+                                          null,
+                                          null,
+                                          null,
+                                          jobConf);
+
+            jobConf = mrCluster.createJobConf();
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to Setup MR Cluster",e);
+        }
+    }
+
+    private void setupZookeeper() {
+        try {
+            zookeeperDir = new File(workDir,"zk").getAbsolutePath();
+            zookeeperPort = findFreePort();
+            zookeeperCluster = new MiniZooKeeperCluster();
+            zookeeperCluster.setClientPort(zookeeperPort);
+            zookeeperCluster.startup(new File(zookeeperDir));
+        } catch(Exception e) {
+            throw new IllegalStateException("Failed to Setup Zookeeper Cluster",e);
+        }
+    }
+
+    private void setupHBaseCluster() {
+        final int numRegionServers = 1;
+
+        try {
+            hbaseDir = new File(workDir,"hbase").getAbsolutePath();
+            hbaseRoot = "file://" + hbaseDir;
+
+            if(hbaseConf == null)
+                hbaseConf = HBaseConfiguration.create();
+
+            hbaseConf.set("hbase.rootdir", hbaseRoot);
+            hbaseConf.set("hbase.master", "local");
+            hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+            hbaseConf.set("hbase.zookeeper.quorum", "127.0.0.1");
+            hbaseConf.setInt("hbase.master.port", findFreePort());
+            hbaseConf.setInt("hbase.master.info.port", -1);
+            hbaseConf.setInt("hbase.regionserver.port", findFreePort());
+            hbaseConf.setInt("hbase.regionserver.info.port", -1);
+
+            hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers);
+            hbaseConf.set("hbase.master", hbaseCluster.getHMasterAddress().toString());
+            //opening the META table ensures that cluster is running
+            new HTable(hbaseConf, HConstants.META_TABLE_NAME);
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to setup HBase Cluster",e);
+        }
+    }
+
+    private void setUpMetastore() throws Exception {
+        if(hiveConf == null)
+            hiveConf = new HiveConf(this.getClass());
+
+        //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
+        //is present only in the ql/test directory
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,"jdbc:derby:"+workDir+"/metastore_db;create=true");
+        //set where derby logs
+        File derbyLogFile = new File(workDir+"/derby.log");
+        derbyLogFile.createNewFile();
+        System.setProperty("derby.stream.error.file",derbyLogFile.getPath());
+
+
+//    Driver driver = new Driver(hiveConf);
+//    SessionState.start(new CliSessionState(hiveConf));
+
+        hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
+    }
+
+    private static int findFreePort() throws IOException {
+        ServerSocket server = new ServerSocket(0);
+        int port = server.getLocalPort();
+        server.close();
+        return port;
+    }
+
+    public static class Builder {
+        private File workDir;
+        private int numTaskTrackers = 1;
+        private JobConf jobConf;
+        private HBaseConfiguration hbaseConf;
+        private HiveConf hiveConf;
+
+        private boolean miniMRClusterEnabled = true;
+        private boolean miniHBaseClusterEnabled = true;
+        private boolean miniHiveMetastoreEnabled = true;
+        private boolean miniZookeeperClusterEnabled = true;
+
+
+        private Builder(File workDir) {
+            this.workDir = workDir;
+        }
+
+        public Builder numTaskTrackers(int num) {
+            numTaskTrackers = num;
+            return this;
+        }
+
+        public Builder jobConf(JobConf jobConf) {
+            this.jobConf = jobConf;
+            return this;
+        }
+
+        public Builder hbaseConf(HBaseConfiguration hbaseConf) {
+            this.hbaseConf = hbaseConf;
+            return this;
+        }
+
+        public Builder hiveConf(HiveConf hiveConf) {
+            this.hiveConf = hiveConf;
+            return this;
+        }
+
+        public Builder miniMRClusterEnabled(boolean enabled) {
+            this.miniMRClusterEnabled = enabled;
+            return this;
+        }
+
+        public Builder miniHBaseClusterEnabled(boolean enabled) {
+            this.miniHBaseClusterEnabled = enabled;
+            return this;
+        }
+
+        public Builder miniZookeeperClusterEnabled(boolean enabled) {
+            this.miniZookeeperClusterEnabled = enabled;
+            return this;
+        }
+
+        public Builder miniHiveMetastoreEnabled(boolean enabled) {
+            this.miniHiveMetastoreEnabled = enabled;
+            return this;
+        }
+
+
+        public ManyMiniCluster build() {
+            return new ManyMiniCluster(this);
+        }
+
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java Fri Oct  7 23:14:21 2011
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Base class for HBase Tests which need a mini cluster instance
+ */
+public abstract class SkeletonHBaseTest {
+
+    protected static String TEST_DIR = System.getProperty("test.data.dir", "./");
+
+    protected final static String DEFAULT_CONTEXT_HANDLE = "default";
+
+    protected static Map<String,Context> contextMap = new HashMap<String,Context>();
+    protected static Set<String> tableNames = new HashSet<String>();
+
+    protected void createTable(String tableName, String[] families) {
+        try {
+            HBaseAdmin admin = new HBaseAdmin(getHbaseConf());
+            HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+            for(String family: families) {
+                HColumnDescriptor columnDescriptor = new HColumnDescriptor(family);
+                tableDesc.addFamily(columnDescriptor);
+            }
+            admin.createTable(tableDesc);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new IllegalStateException(e);
+        }
+
+    }
+
+    protected String newTableName(String prefix) {
+        String name =null;
+        int tries = 100;
+        do {
+            name = prefix+"_"+Math.abs(new Random().nextLong());
+        } while(tableNames.contains(name) && --tries > 0);
+        if(tableNames.contains(name))
+            throw new IllegalStateException("Couldn't find a unique table name, tableNames size: "+tableNames.size());
+        tableNames.add(name);
+        return name;
+    }
+
+    /**
+     * startup an hbase cluster instance before a test suite runs
+     */
+    @BeforeClass
+    public static void setup() {
+        if(!contextMap.containsKey(getContextHandle()))
+            contextMap.put(getContextHandle(),new Context(getContextHandle()));
+
+        contextMap.get(getContextHandle()).start();
+    }
+
+    /**
+     * shutdown an hbase cluster instance ant the end of the test suite
+     */
+    @AfterClass
+    public static void tearDown() {
+        contextMap.get(getContextHandle()).stop();
+    }
+
+    /**
+     * override this with a different context handle if tests suites are run simultaneously
+     * and ManyMiniCluster instances shouldn't be shared
+     * @return
+     */
+    public static String getContextHandle() {
+        return DEFAULT_CONTEXT_HANDLE;
+    }
+
+    /**
+     * @return working directory for a given test context, which normally is a test suite
+     */
+    public String getTestDir() {
+        return contextMap.get(getContextHandle()).getTestDir();
+    }
+
+    /**
+     * @return ManyMiniCluster instance
+     */
+    public ManyMiniCluster getCluster() {
+        return contextMap.get(getContextHandle()).getCluster();
+    }
+
+    /**
+     * @return configuration of MiniHBaseCluster
+     */
+    public Configuration getHbaseConf() {
+        return contextMap.get(getContextHandle()).getHbaseConf();
+    }
+
+    /**
+     * @return configuration of MiniMRCluster
+     */
+    public Configuration getJobConf() {
+        return contextMap.get(getContextHandle()).getJobConf();
+    }
+
+    /**
+     * @return configuration of Hive Metastore
+     */
+    public HiveConf getHiveConf() {
+        return contextMap.get(getContextHandle()).getHiveConf();
+    }
+
+    /**
+     * @return filesystem used by ManyMiniCluster daemons
+     */
+    public FileSystem getFileSystem() {
+        return contextMap.get(getContextHandle()).getFileSystem();
+    }
+
+    /**
+     * class used to encapsulate a context which is normally used by
+     * a single TestSuite or across TestSuites when multi-threaded testing is turned on
+     */
+    public static class Context {
+        protected String testDir;
+        protected ManyMiniCluster cluster;
+
+        protected Configuration hbaseConf;
+        protected Configuration jobConf;
+        protected HiveConf hiveConf;
+
+        protected FileSystem fileSystem;
+
+        protected int usageCount = 0;
+
+        public Context(String handle) {
+            try {
+                testDir = new File(TEST_DIR+"/test_"+handle+"_"+Math.abs(new Random().nextLong())+"/").getCanonicalPath();
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to generate testDir",e);
+            }
+            System.out.println("Cluster work directory: "+testDir);
+        }
+
+        public void start() {
+            if(usageCount++ == 0) {
+                cluster = ManyMiniCluster.create(new File(testDir)).build();
+                cluster.start();
+                hbaseConf = cluster.getHBaseConf();
+                jobConf = cluster.getJobConf();
+                fileSystem = cluster.getFileSystem();
+                hiveConf = cluster.getHiveConf();
+            }
+        }
+
+        public void stop() {
+            if( --usageCount == 0)  {
+                try {
+                    cluster.stop();
+                    cluster = null;
+                } finally {
+                    System.out.println("Trying to cleanup: "+testDir);
+                    try {
+                        FileUtil.fullyDelete(new File(testDir));
+                    } catch (IOException e) {
+                        throw new IllegalStateException("Failed to cleanup test dir",e);
+                    }
+                }
+            }
+        }
+
+        public String getTestDir() {
+            return testDir;
+        }
+
+        public ManyMiniCluster getCluster() {
+            return cluster;
+        }
+
+        public Configuration getHbaseConf() {
+            return hbaseConf;
+        }
+
+        public Configuration getJobConf() {
+            return jobConf;
+        }
+
+        public HiveConf getHiveConf() {
+            return hiveConf;
+        }
+
+        public FileSystem getFileSystem() {
+            return fileSystem;
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java Fri Oct  7 23:14:21 2011
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test HBaseDirectOuputStorageDriver and HBaseDirectOUtputFormat using a MiniCluster
+ */
+public class TestHBaseDirectOutputStorageDriver extends SkeletonHBaseTest {
+
+    private void registerHBaseTable(String tableName) throws Exception {
+
+        String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ;
+        HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf());
+
+        try {
+            client.dropTable(databaseName, tableName);
+        } catch(Exception e) {
+        } //can fail with NoSuchObjectException
+
+
+        Table tbl = new Table();
+        tbl.setDbName(databaseName);
+        tbl.setTableName(tableName);
+        tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+        StorageDescriptor sd = new StorageDescriptor();
+
+        sd.setCols(getTableColumns());
+        tbl.setPartitionKeys(new ArrayList<FieldSchema>());
+
+        tbl.setSd(sd);
+
+        sd.setBucketCols(new ArrayList<String>(2));
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters().put(
+                Constants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
+        sd.setInputFormat("fillme");
+        sd.setOutputFormat(HBaseDirectOutputFormat.class.getName());
+
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme");
+        tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseDirectOutputStorageDriver.class.getName());
+        tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish");
+        tbl.setParameters(tableParams);
+
+        client.createTable(tbl);
+    }
+
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""));
+        fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    private static  List<HCatFieldSchema> generateDataColumns() throws HCatException {
+        List<HCatFieldSchema> dataColumns = new ArrayList<HCatFieldSchema>();
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")));
+        return dataColumns;
+    }
+
+    public void test() throws IOException {
+        Configuration conf = getHbaseConf();
+        String tableName = "my_table";
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+        createTable(tableName,new String[]{familyName});
+        HTable table = new HTable(getHbaseConf(),tableNameBytes);
+        byte[] key = Bytes.toBytes("foo");
+        byte[] qualifier = Bytes.toBytes("qualifier");
+        byte[] val = Bytes.toBytes("bar");
+        Put put = new Put(key);
+        put.add(familyNameBytes, qualifier, val);
+        table.put(put);
+        Result result = table.get(new Get(key));
+        assertTrue(Bytes.equals(val, result.getValue(familyNameBytes, qualifier)));
+    }
+
+    @Test
+    public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String tableName = newTableName("mrTest");
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(getJobConf());
+        for(Map.Entry<String,String> el: getHbaseConf()) {
+            if(el.getKey().startsWith("hbase.")) {
+                conf.set(el.getKey(),el.getValue());
+            }
+        }
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:ONE,spanish:DOS",
+                "3,english:ONE,spanish:TRES"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        Job job = new Job(conf, "hcat mapreduce write test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormatClass(HBaseDirectOutputFormat.class);
+        job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(Put.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(Bytes.toBytes("my_family"));
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        assertEquals(data.length,index);
+    }
+
+    public static class MapWrite extends Mapper<LongWritable, Text, BytesWritable, Put> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            context.write(new BytesWritable(Bytes.toBytes(vals[0])),put);
+        }
+    }
+
+    @Test
+    public void directOutputStorageDriverTest() throws Exception {
+        String tableName = newTableName("mrtest");
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(getJobConf());
+        for(Map.Entry<String,String> el: getHbaseConf()) {
+            if(el.getKey().startsWith("hbase.")) {
+                conf.set(el.getKey(),el.getValue());
+            }
+        }
+
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+        registerHBaseTable(tableName);
+
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:ONE,spanish:DOS",
+                "3,english:ONE,spanish:TRES"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        Job job = new Job(conf, "hcat mapreduce write test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null);
+        outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(Bytes.toBytes("my_family"));
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+            }
+            index++;
+        }
+        assertEquals(data.length,index);
+    }
+
+    public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            HCatRecord record = new DefaultHCatRecord(3);
+            HCatSchema schema = new HCatSchema(generateDataColumns());
+            String vals[] = value.toString().split(",");
+            record.setInteger("key",schema,Integer.parseInt(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                record.set(pair[0],schema,pair[1]);
+            }
+            context.write(null,record);
+        }
+    }
+}