You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/10/07 23:14:21 UTC
svn commit: r1180275 - in /incubator/hcatalog/trunk: ./
storage-drivers/hbase/
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Author: khorgath
Date: Fri Oct 7 23:14:21 2011
New Revision: 1180275
URL: http://svn.apache.org/viewvc?rev=1180275&view=rev
Log:
HCATALOG-73 Output Storage Driver for HBase (Direct PUTs) (toffer via khorgath)
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/build.xml
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1180275&r1=1180274&r2=1180275&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Oct 7 23:14:21 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-73. Output Storage Driver for HBase (Direct PUTs) (toffer via khorgath)
+
HCAT-74. ResultConverter for HBase Storage Drivers (avandana via khorgath)
HCAT-89. Support for creating non-native tables (avandana via gates)
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/build.xml?rev=1180275&r1=1180274&r2=1180275&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/build.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/build.xml Fri Oct 7 23:14:21 2011
@@ -70,6 +70,7 @@
<property name="test.all.file" value="${test.src.dir}/all-tests"/>
<property name="test.exclude.file" value="${test.src.dir}/excluded-tests"/>
<property name="test.output" value="no"/>
+ <property name="test.warehouse.dir" value="/tmp/hcat_junit_warehouse"/>
<property name="hive.conf.dir" value="${hive.root}/conf"/>
<!-- ivy properteis set here -->
@@ -241,9 +242,12 @@
<delete dir="${test.log.dir}"/>
<mkdir dir="${test.log.dir}"/>
+ <delete dir="${test.warehouse.dir}"/>
+ <mkdir dir="${test.warehouse.dir}"/>
<junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed">
+ <sysproperty key="hive.metastore.warehouse.dir" value="${test.warehouse.dir}"/>
<classpath>
<pathelement location="${test.build.classes}" />
<pathelement location="." />
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Fri Oct 7 23:14:21 2011
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+import java.io.IOException;
+
+/**
+ * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put API to write each row to HBase one a
+ * time. Presently it is just using TableOutputFormat as the underlying implementation in the future we can
+ * tune this to make the writes faster such as permanently disabling WAL, caching, etc.
+ */
+class HBaseDirectOutputFormat extends OutputFormat<WritableComparable<?>,Writable> implements Configurable {
+
+ private TableOutputFormat<WritableComparable<?>> outputFormat;
+
+ public HBaseDirectOutputFormat() {
+ this.outputFormat = new TableOutputFormat<WritableComparable<?>>();
+ }
+
+ @Override
+ public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return outputFormat.getRecordWriter(context);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ outputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return outputFormat.getOutputCommitter(context);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ String tableName = conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY);
+ conf = new Configuration(conf);
+ conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);
+ outputFormat.setConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return outputFormat.getConf();
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java Fri Oct 7 23:14:21 2011
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * HBase Storage driver implementation which uses "direct" writes to hbase for writing out records.
+ */
+public class HBaseDirectOutputStorageDriver extends HCatOutputStorageDriver {
+ private HCatTableInfo tableInfo;
+ private HBaseDirectOutputFormat outputFormat;
+ private ResultConverter converter;
+ private OutputJobInfo outputJobInfo;
+ private HCatSchema schema;
+ private HCatSchema outputSchema;
+
+ @Override
+ public void initialize(JobContext context, Properties hcatProperties) throws IOException {
+ super.initialize(context, hcatProperties);
+ String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ if( jobString == null ) {
+ throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?");
+ }
+ outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+ tableInfo = outputJobInfo.getTableInfo();
+ schema = tableInfo.getDataColumns();
+
+ List<FieldSchema> fields = HCatUtil.getFieldSchemaList(outputSchema.getFields());
+ hcatProperties.setProperty(Constants.LIST_COLUMNS,
+ MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+ hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
+ MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+
+ //override table properties with user defined ones
+ //in the future we should be more selective on what to override
+ hcatProperties.putAll(outputJobInfo.getProperties());
+ //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called
+ converter = new HBaseSerDeResultConverter(schema,
+ outputSchema,
+ hcatProperties);
+ context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName());
+ outputFormat = new HBaseDirectOutputFormat();
+ outputFormat.setConf(context.getConfiguration());
+ }
+
+ @Override
+ public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+ return outputFormat;
+ }
+
+ @Override
+ public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException {
+ this.outputSchema = schema;
+ }
+
+ @Override
+ public WritableComparable<?> generateKey(HCatRecord value) throws IOException {
+ //HBase doesn't use KEY as part of output
+ return null;
+ }
+
+ @Override
+ public Writable convertValue(HCatRecord value) throws IOException {
+ return converter.convert(value);
+ }
+
+ @Override
+ public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException {
+ //no partitions for this driver
+ }
+
+ @Override
+ public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void setOutputPath(JobContext jobContext, String location) throws IOException {
+ //no output path
+ }
+
+ @Override
+ public String getOutputLocation(JobContext jobContext, String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
+ return null;
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Fri Oct 7 23:14:21 2011
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import com.sun.java.util.jar.pack.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Map;
+
+/**
+ * MiniCluster class composed of a number of Hadoop Minicluster implementations
+ * and other necessary daemons needed for testing (HBase, Hive MetaStore, Zookeeper, MiniMRCluster)
+ */
+public class ManyMiniCluster {
+
+ //MR stuff
+ private boolean miniMRClusterEnabled;
+ private MiniMRCluster mrCluster;
+ private int numTaskTrackers;
+ private JobConf jobConf;
+
+ //HBase stuff
+ private boolean miniHBaseClusterEnabled;
+ private MiniHBaseCluster hbaseCluster;
+ private String hbaseRoot;
+ private Configuration hbaseConf;
+ private String hbaseDir;
+
+ //ZK Stuff
+ private boolean miniZookeeperClusterEnabled;
+ private MiniZooKeeperCluster zookeeperCluster;
+ private int zookeeperPort;
+ private String zookeeperDir;
+
+ //DFS Stuff
+ private MiniDFSCluster dfsCluster;
+
+ //Hive Stuff
+ private boolean miniHiveMetastoreEnabled;
+ private HiveConf hiveConf;
+ private HiveMetaStoreClient hiveMetaStoreClient;
+
+ private final File workDir;
+ private boolean started = false;
+
+
+ /**
+ * create a cluster instance using a builder which will expose configurable options
+ * @param workDir working directory ManyMiniCluster will use for all of it's *Minicluster instances
+ * @return a Builder instance
+ */
+ public static Builder create(File workDir) {
+ return new Builder(workDir);
+ }
+
+ private ManyMiniCluster(Builder b) {
+ workDir = b.workDir;
+ numTaskTrackers = b.numTaskTrackers;
+ hiveConf = b.hiveConf;
+ jobConf = b.jobConf;
+ hbaseConf = b.hbaseConf;
+ miniMRClusterEnabled = b.miniMRClusterEnabled;
+ miniHBaseClusterEnabled = b.miniHBaseClusterEnabled;
+ miniHiveMetastoreEnabled = b.miniHiveMetastoreEnabled;
+ miniZookeeperClusterEnabled = b.miniZookeeperClusterEnabled;
+ }
+
+ protected synchronized void start() {
+ try {
+ if (!started) {
+ FileUtil.fullyDelete(workDir);
+ if(miniMRClusterEnabled) {
+ setupMRCluster();
+ }
+ if(miniZookeeperClusterEnabled || miniHBaseClusterEnabled) {
+ miniZookeeperClusterEnabled = true;
+ setupZookeeper();
+ }
+ if(miniHBaseClusterEnabled) {
+ setupHBaseCluster();
+ }
+ if(miniHiveMetastoreEnabled) {
+ setUpMetastore();
+ }
+ }
+ } catch(Exception e) {
+ throw new IllegalStateException("Failed to setup cluster",e);
+ }
+ }
+
+ protected synchronized void stop() {
+ if (hbaseCluster != null) {
+ HConnectionManager.deleteAllConnections(true);
+ try {
+ hbaseCluster.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ hbaseCluster = null;
+ }
+ if (zookeeperCluster != null) {
+ try {
+ zookeeperCluster.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ zookeeperCluster = null;
+ }
+ if(mrCluster != null) {
+ try {
+ mrCluster.shutdown();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ mrCluster = null;
+ }
+ if(dfsCluster != null) {
+ try {
+ dfsCluster.getFileSystem().close();
+ dfsCluster.shutdown();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ dfsCluster = null;
+ }
+ try {
+ FileSystem.closeAll();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ started = false;
+ }
+
+ /**
+ * @return Configuration of mini HBase cluster
+ */
+ public Configuration getHBaseConf() {
+ return HBaseConfiguration.create(hbaseConf);
+ }
+
+ /**
+ * @return Configuration of mini MR cluster
+ */
+ public Configuration getJobConf() {
+ return new Configuration(jobConf);
+ }
+
+ /**
+ * @return Configuration of Hive Metastore, this is a standalone not a daemon
+ */
+ public HiveConf getHiveConf() {
+ return new HiveConf(hiveConf);
+ }
+
+ /**
+ * @return Filesystem used by MiniMRCluster and MiniHBaseCluster
+ */
+ public FileSystem getFileSystem() {
+ try {
+ return FileSystem.get(jobConf);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to get FileSystem",e);
+ }
+ }
+
+ /**
+ * @return Metastore client instance
+ */
+ public HiveMetaStoreClient getHiveMetaStoreClient() {
+ return hiveMetaStoreClient;
+ }
+
+ private void setupMRCluster() {
+ try {
+ final int jobTrackerPort = findFreePort();
+ final int taskTrackerPort = findFreePort();
+
+ if(jobConf == null)
+ jobConf = new JobConf();
+
+ jobConf.setInt("mapred.submit.replication", 1);
+ //conf.set("hadoop.job.history.location",new File(workDir).getAbsolutePath()+"/history");
+ System.setProperty("hadoop.log.dir",new File(workDir,"/logs").getAbsolutePath());
+
+ mrCluster = new MiniMRCluster(jobTrackerPort,
+ taskTrackerPort,
+ numTaskTrackers,
+ getFileSystem().getUri().toString(),
+ numTaskTrackers,
+ null,
+ null,
+ null,
+ jobConf);
+
+ jobConf = mrCluster.createJobConf();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to Setup MR Cluster",e);
+ }
+ }
+
+ private void setupZookeeper() {
+ try {
+ zookeeperDir = new File(workDir,"zk").getAbsolutePath();
+ zookeeperPort = findFreePort();
+ zookeeperCluster = new MiniZooKeeperCluster();
+ zookeeperCluster.setClientPort(zookeeperPort);
+ zookeeperCluster.startup(new File(zookeeperDir));
+ } catch(Exception e) {
+ throw new IllegalStateException("Failed to Setup Zookeeper Cluster",e);
+ }
+ }
+
+ private void setupHBaseCluster() {
+ final int numRegionServers = 1;
+
+ try {
+ hbaseDir = new File(workDir,"hbase").getAbsolutePath();
+ hbaseRoot = "file://" + hbaseDir;
+
+ if(hbaseConf == null)
+ hbaseConf = HBaseConfiguration.create();
+
+ hbaseConf.set("hbase.rootdir", hbaseRoot);
+ hbaseConf.set("hbase.master", "local");
+ hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+ hbaseConf.set("hbase.zookeeper.quorum", "127.0.0.1");
+ hbaseConf.setInt("hbase.master.port", findFreePort());
+ hbaseConf.setInt("hbase.master.info.port", -1);
+ hbaseConf.setInt("hbase.regionserver.port", findFreePort());
+ hbaseConf.setInt("hbase.regionserver.info.port", -1);
+
+ hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers);
+ hbaseConf.set("hbase.master", hbaseCluster.getHMasterAddress().toString());
+ //opening the META table ensures that cluster is running
+ new HTable(hbaseConf, HConstants.META_TABLE_NAME);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to setup HBase Cluster",e);
+ }
+ }
+
+ private void setUpMetastore() throws Exception {
+ if(hiveConf == null)
+ hiveConf = new HiveConf(this.getClass());
+
+ //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
+ //is present only in the ql/test directory
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,"jdbc:derby:"+workDir+"/metastore_db;create=true");
+ //set where derby logs
+ File derbyLogFile = new File(workDir+"/derby.log");
+ derbyLogFile.createNewFile();
+ System.setProperty("derby.stream.error.file",derbyLogFile.getPath());
+
+
+// Driver driver = new Driver(hiveConf);
+// SessionState.start(new CliSessionState(hiveConf));
+
+ hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
+ }
+
+ private static int findFreePort() throws IOException {
+ ServerSocket server = new ServerSocket(0);
+ int port = server.getLocalPort();
+ server.close();
+ return port;
+ }
+
+ public static class Builder {
+ private File workDir;
+ private int numTaskTrackers = 1;
+ private JobConf jobConf;
+ private HBaseConfiguration hbaseConf;
+ private HiveConf hiveConf;
+
+ private boolean miniMRClusterEnabled = true;
+ private boolean miniHBaseClusterEnabled = true;
+ private boolean miniHiveMetastoreEnabled = true;
+ private boolean miniZookeeperClusterEnabled = true;
+
+
+ private Builder(File workDir) {
+ this.workDir = workDir;
+ }
+
+ public Builder numTaskTrackers(int num) {
+ numTaskTrackers = num;
+ return this;
+ }
+
+ public Builder jobConf(JobConf jobConf) {
+ this.jobConf = jobConf;
+ return this;
+ }
+
+ public Builder hbaseConf(HBaseConfiguration hbaseConf) {
+ this.hbaseConf = hbaseConf;
+ return this;
+ }
+
+ public Builder hiveConf(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ return this;
+ }
+
+ public Builder miniMRClusterEnabled(boolean enabled) {
+ this.miniMRClusterEnabled = enabled;
+ return this;
+ }
+
+ public Builder miniHBaseClusterEnabled(boolean enabled) {
+ this.miniHBaseClusterEnabled = enabled;
+ return this;
+ }
+
+ public Builder miniZookeeperClusterEnabled(boolean enabled) {
+ this.miniZookeeperClusterEnabled = enabled;
+ return this;
+ }
+
+ public Builder miniHiveMetastoreEnabled(boolean enabled) {
+ this.miniHiveMetastoreEnabled = enabled;
+ return this;
+ }
+
+
+ public ManyMiniCluster build() {
+ return new ManyMiniCluster(this);
+ }
+
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java Fri Oct 7 23:14:21 2011
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Base class for HBase Tests which need a mini cluster instance
+ */
+public abstract class SkeletonHBaseTest {
+
+ protected static String TEST_DIR = System.getProperty("test.data.dir", "./");
+
+ protected final static String DEFAULT_CONTEXT_HANDLE = "default";
+
+ protected static Map<String,Context> contextMap = new HashMap<String,Context>();
+ protected static Set<String> tableNames = new HashSet<String>();
+
+ protected void createTable(String tableName, String[] families) {
+ try {
+ HBaseAdmin admin = new HBaseAdmin(getHbaseConf());
+ HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+ for(String family: families) {
+ HColumnDescriptor columnDescriptor = new HColumnDescriptor(family);
+ tableDesc.addFamily(columnDescriptor);
+ }
+ admin.createTable(tableDesc);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ protected String newTableName(String prefix) {
+ String name =null;
+ int tries = 100;
+ do {
+ name = prefix+"_"+Math.abs(new Random().nextLong());
+ } while(tableNames.contains(name) && --tries > 0);
+ if(tableNames.contains(name))
+ throw new IllegalStateException("Couldn't find a unique table name, tableNames size: "+tableNames.size());
+ tableNames.add(name);
+ return name;
+ }
+
+ /**
+ * startup an hbase cluster instance before a test suite runs
+ */
+ @BeforeClass
+ public static void setup() {
+ if(!contextMap.containsKey(getContextHandle()))
+ contextMap.put(getContextHandle(),new Context(getContextHandle()));
+
+ contextMap.get(getContextHandle()).start();
+ }
+
+ /**
+ * shutdown an hbase cluster instance ant the end of the test suite
+ */
+ @AfterClass
+ public static void tearDown() {
+ contextMap.get(getContextHandle()).stop();
+ }
+
+ /**
+ * override this with a different context handle if tests suites are run simultaneously
+ * and ManyMiniCluster instances shouldn't be shared
+ * @return
+ */
+ public static String getContextHandle() {
+ return DEFAULT_CONTEXT_HANDLE;
+ }
+
+ /**
+ * @return working directory for a given test context, which normally is a test suite
+ */
+ public String getTestDir() {
+ return contextMap.get(getContextHandle()).getTestDir();
+ }
+
+ /**
+ * @return ManyMiniCluster instance
+ */
+ public ManyMiniCluster getCluster() {
+ return contextMap.get(getContextHandle()).getCluster();
+ }
+
+ /**
+ * @return configuration of MiniHBaseCluster
+ */
+ public Configuration getHbaseConf() {
+ return contextMap.get(getContextHandle()).getHbaseConf();
+ }
+
+ /**
+ * @return configuration of MiniMRCluster
+ */
+ public Configuration getJobConf() {
+ return contextMap.get(getContextHandle()).getJobConf();
+ }
+
+ /**
+ * @return configuration of Hive Metastore
+ */
+ public HiveConf getHiveConf() {
+ return contextMap.get(getContextHandle()).getHiveConf();
+ }
+
+ /**
+ * @return filesystem used by ManyMiniCluster daemons
+ */
+ public FileSystem getFileSystem() {
+ return contextMap.get(getContextHandle()).getFileSystem();
+ }
+
+ /**
+ * class used to encapsulate a context which is normally used by
+ * a single TestSuite or across TestSuites when multi-threaded testing is turned on
+ */
+ public static class Context {
+ protected String testDir;
+ protected ManyMiniCluster cluster;
+
+ protected Configuration hbaseConf;
+ protected Configuration jobConf;
+ protected HiveConf hiveConf;
+
+ protected FileSystem fileSystem;
+
+ protected int usageCount = 0;
+
+ public Context(String handle) {
+ try {
+ testDir = new File(TEST_DIR+"/test_"+handle+"_"+Math.abs(new Random().nextLong())+"/").getCanonicalPath();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to generate testDir",e);
+ }
+ System.out.println("Cluster work directory: "+testDir);
+ }
+
+ public void start() {
+ if(usageCount++ == 0) {
+ cluster = ManyMiniCluster.create(new File(testDir)).build();
+ cluster.start();
+ hbaseConf = cluster.getHBaseConf();
+ jobConf = cluster.getJobConf();
+ fileSystem = cluster.getFileSystem();
+ hiveConf = cluster.getHiveConf();
+ }
+ }
+
+ public void stop() {
+ if( --usageCount == 0) {
+ try {
+ cluster.stop();
+ cluster = null;
+ } finally {
+ System.out.println("Trying to cleanup: "+testDir);
+ try {
+ FileUtil.fullyDelete(new File(testDir));
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to cleanup test dir",e);
+ }
+ }
+ }
+ }
+
+ public String getTestDir() {
+ return testDir;
+ }
+
+ public ManyMiniCluster getCluster() {
+ return cluster;
+ }
+
+ public Configuration getHbaseConf() {
+ return hbaseConf;
+ }
+
+ public Configuration getJobConf() {
+ return jobConf;
+ }
+
+ public HiveConf getHiveConf() {
+ return hiveConf;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+ }
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1180275&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java Fri Oct 7 23:14:21 2011
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test HBaseDirectOuputStorageDriver and HBaseDirectOUtputFormat using a MiniCluster
+ */
+public class TestHBaseDirectOutputStorageDriver extends SkeletonHBaseTest {
+
+ private void registerHBaseTable(String tableName) throws Exception {
+
+ String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ;
+ HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf());
+
+ try {
+ client.dropTable(databaseName, tableName);
+ } catch(Exception e) {
+ } //can fail with NoSuchObjectException
+
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+ StorageDescriptor sd = new StorageDescriptor();
+
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(new ArrayList<FieldSchema>());
+
+ tbl.setSd(sd);
+
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(
+ Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
+ sd.setInputFormat("fillme");
+ sd.setOutputFormat(HBaseDirectOutputFormat.class.getName());
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme");
+ tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseDirectOutputStorageDriver.class.getName());
+ tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish");
+ tbl.setParameters(tableParams);
+
+ client.createTable(tbl);
+ }
+
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ private static List<HCatFieldSchema> generateDataColumns() throws HCatException {
+ List<HCatFieldSchema> dataColumns = new ArrayList<HCatFieldSchema>();
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")));
+ return dataColumns;
+ }
+
+ public void test() throws IOException {
+ Configuration conf = getHbaseConf();
+ String tableName = "my_table";
+ byte[] tableNameBytes = Bytes.toBytes(tableName);
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+ createTable(tableName,new String[]{familyName});
+ HTable table = new HTable(getHbaseConf(),tableNameBytes);
+ byte[] key = Bytes.toBytes("foo");
+ byte[] qualifier = Bytes.toBytes("qualifier");
+ byte[] val = Bytes.toBytes("bar");
+ Put put = new Put(key);
+ put.add(familyNameBytes, qualifier, val);
+ table.put(put);
+ Result result = table.get(new Get(key));
+ assertTrue(Bytes.equals(val, result.getValue(familyNameBytes, qualifier)));
+ }
+
+ @Test
+ public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+ String tableName = newTableName("mrTest");
+ byte[] tableNameBytes = Bytes.toBytes(tableName);
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(getJobConf());
+ for(Map.Entry<String,String> el: getHbaseConf()) {
+ if(el.getKey().startsWith("hbase.")) {
+ conf.set(el.getKey(),el.getValue());
+ }
+ }
+
+ //create table
+ createTable(tableName,new String[]{familyName});
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+
+
+ // input/output settings
+ Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+ for(String line: data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+
+ //create job
+ Job job = new Job(conf, "hcat mapreduce write test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+ job.setOutputFormatClass(HBaseDirectOutputFormat.class);
+ job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Put.class);
+
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes("my_family"));
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ assertEquals(data.length,index);
+ }
+
+ public static class MapWrite extends Mapper<LongWritable, Text, BytesWritable, Put> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ context.write(new BytesWritable(Bytes.toBytes(vals[0])),put);
+ }
+ }
+
+ @Test
+ public void directOutputStorageDriverTest() throws Exception {
+ String tableName = newTableName("mrtest");
+ byte[] tableNameBytes = Bytes.toBytes(tableName);
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(getJobConf());
+ for(Map.Entry<String,String> el: getHbaseConf()) {
+ if(el.getKey().startsWith("hbase.")) {
+ conf.set(el.getKey(),el.getValue());
+ }
+ }
+
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+ //create table
+ createTable(tableName,new String[]{familyName});
+ registerHBaseTable(tableName);
+
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+
+
+ // input/output settings
+ Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+ for(String line: data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+
+ //create job
+ Job job = new Job(conf, "hcat mapreduce write test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapHCatWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null);
+ outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
+ HCatOutputFormat.setOutput(job,outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Put.class);
+
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes("my_family"));
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+ }
+ index++;
+ }
+ assertEquals(data.length,index);
+ }
+
+ public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ HCatRecord record = new DefaultHCatRecord(3);
+ HCatSchema schema = new HCatSchema(generateDataColumns());
+ String vals[] = value.toString().split(",");
+ record.setInteger("key",schema,Integer.parseInt(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ record.set(pair[0],schema,pair[1]);
+ }
+ context.write(null,record);
+ }
+ }
+}