You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [12/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/...
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestMultiOutputFormat {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestMultiOutputFormat.class);
+ private static File workDir;
+ private static Configuration mrConf = null;
+ private static FileSystem fs = null;
+ private static MiniMRCluster mrCluster = null;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ createWorkDir();
+ Configuration conf = new Configuration(true);
+ conf.set("yarn.scheduler.capacity.root.queues", "default");
+ conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+
+ fs = FileSystem.get(conf);
+ System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+ // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+ // to use MiniMRCluster. MAPREDUCE-2350
+ mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ mrConf = mrCluster.createJobConf();
+ }
+
+ private static void createWorkDir() throws IOException {
+ String testDir = System.getProperty("test.data.dir", "./");
+ testDir = testDir + "/test_multiout_" + Math.abs(new Random().nextLong()) + "/";
+ workDir = new File(new File(testDir).getCanonicalPath());
+ FileUtil.fullyDelete(workDir);
+ workDir.mkdirs();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ FileUtil.fullyDelete(workDir);
+ }
+
+ /**
+ * A test job that reads a input file and outputs each word and the index of
+ * the word encountered to a text file and sequence file with different key
+ * values.
+ */
+ @Test
+ public void testMultiOutputFormatWithoutReduce() throws Throwable {
+ Job job = new Job(mrConf, "MultiOutNoReduce");
+ job.setMapperClass(MultiOutWordIndexMapper.class);
+ job.setJarByClass(this.getClass());
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(MultiOutputFormat.class);
+ job.setNumReduceTasks(0);
+
+ JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+ configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
+ configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
+ IntWritable.class);
+ Path outDir = new Path(workDir.getPath(), job.getJobName());
+ FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
+ FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
+
+ String fileContent = "Hello World";
+ String inputFile = createInputFile(fileContent);
+ FileInputFormat.setInputPaths(job, new Path(inputFile));
+
+ //Test for merging of configs
+ DistributedCache.addFileToClassPath(new Path(inputFile), job.getConfiguration(), fs);
+ String dummyFile = createInputFile("dummy file");
+ DistributedCache.addFileToClassPath(new Path(dummyFile), configurer.getJob("out1")
+ .getConfiguration(), fs);
+ // duplicate of the value. Merging should remove duplicates
+ DistributedCache.addFileToClassPath(new Path(inputFile), configurer.getJob("out2")
+ .getConfiguration(), fs);
+
+ configurer.configure();
+
+ // Verify if the configs are merged
+ Path[] fileClassPaths = DistributedCache.getFileClassPaths(job.getConfiguration());
+ List<Path> fileClassPathsList = Arrays.asList(fileClassPaths);
+ Assert.assertTrue(fileClassPathsList.contains(new Path(inputFile)));
+ Assert.assertTrue(fileClassPathsList.contains(new Path(dummyFile)));
+
+ URI[] cacheFiles = DistributedCache.getCacheFiles(job.getConfiguration());
+ List<URI> cacheFilesList = Arrays.asList(cacheFiles);
+ Assert.assertTrue(cacheFilesList.contains(new Path(inputFile).makeQualified(fs).toUri()));
+ Assert.assertTrue(cacheFilesList.contains(new Path(dummyFile).makeQualified(fs).toUri()));
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ Path textOutPath = new Path(outDir, "out1/part-m-00000");
+ String[] textOutput = readFully(textOutPath).split("\n");
+ Path seqOutPath = new Path(outDir, "out2/part-m-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ String[] words = fileContent.split(" ");
+ Assert.assertEquals(words.length, textOutput.length);
+ LOG.info("Verifying file contents");
+ for (int i = 0; i < words.length; i++) {
+ Assert.assertEquals((i + 1) + "\t" + words[i], textOutput[i]);
+ reader.next(key, value);
+ Assert.assertEquals(words[i], key.toString());
+ Assert.assertEquals((i + 1), value.get());
+ }
+ Assert.assertFalse(reader.next(key, value));
+ }
+
+ /**
+ * A word count test job that reads a input file and outputs the count of
+ * words to a text file and sequence file with different key values.
+ */
+ @Test
+ public void testMultiOutputFormatWithReduce() throws Throwable {
+ Job job = new Job(mrConf, "MultiOutWithReduce");
+
+ job.setMapperClass(WordCountMapper.class);
+ job.setReducerClass(MultiOutWordCountReducer.class);
+ job.setJarByClass(this.getClass());
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(MultiOutputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+
+ configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
+ configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
+ IntWritable.class);
+ configurer.addOutputFormat("out3", NullOutputFormat.class, Text.class,
+ IntWritable.class);
+ Path outDir = new Path(workDir.getPath(), job.getJobName());
+ FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
+ FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
+
+ configurer.configure();
+
+ String fileContent = "Hello World Hello World World";
+ String inputFile = createInputFile(fileContent);
+ FileInputFormat.setInputPaths(job, new Path(inputFile));
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ Path textOutPath = new Path(outDir, "out1/part-r-00000");
+ String[] textOutput = readFully(textOutPath).split("\n");
+ Path seqOutPath = new Path(outDir, "out2/part-r-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ String[] words = "Hello World".split(" ");
+ Assert.assertEquals(words.length, textOutput.length);
+ for (int i = 0; i < words.length; i++) {
+ Assert.assertEquals((i + 2) + "\t" + words[i], textOutput[i]);
+ reader.next(key, value);
+ Assert.assertEquals(words[i], key.toString());
+ Assert.assertEquals((i + 2), value.get());
+ }
+ Assert.assertFalse(reader.next(key, value));
+
+ }
+
+
+ /**
+ * Create a file for map input
+ *
+ * @return absolute path of the file.
+ * @throws IOException if any error encountered
+ */
+ private String createInputFile(String content) throws IOException {
+ File f = File.createTempFile("input", "txt");
+ FileWriter writer = new FileWriter(f);
+ writer.write(content);
+ writer.close();
+ return f.getAbsolutePath();
+ }
+
+ private String readFully(Path file) throws IOException {
+ FSDataInputStream in = fs.open(file);
+ byte[] b = new byte[in.available()];
+ in.readFully(b);
+ in.close();
+ return new String(b);
+ }
+
+ private static class MultiOutWordIndexMapper extends
+ Mapper<LongWritable, Text, Writable, Writable> {
+
+ private IntWritable index = new IntWritable(1);
+ private Text word = new Text();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ MultiOutputFormat.write("out1", index, word, context);
+ MultiOutputFormat.write("out2", word, index, context);
+ index.set(index.get() + 1);
+ }
+ }
+ }
+
+ private static class WordCountMapper extends
+ Mapper<LongWritable, Text, Text, IntWritable> {
+
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ context.write(word, one);
+ }
+ }
+ }
+
+ private static class MultiOutWordCountReducer extends
+ Reducer<Text, IntWritable, Writable, Writable> {
+
+ private IntWritable count = new IntWritable();
+
+ @Override
+ protected void reduce(Text word, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+ count.set(sum);
+ MultiOutputFormat.write("out1", count, word, context);
+ MultiOutputFormat.write("out2", word, count, context);
+ MultiOutputFormat.write("out3", word, count, context);
+ }
+ }
+
+ private static class NullOutputFormat<K, V> extends
+ org.apache.hadoop.mapreduce.lib.output.NullOutputFormat<K, V> {
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+ return new OutputCommitter() {
+ public void abortTask(TaskAttemptContext taskContext) {
+ }
+
+ public void cleanupJob(JobContext jobContext) {
+ }
+
+ public void commitJob(JobContext jobContext) {
+ }
+
+ public void commitTask(TaskAttemptContext taskContext) {
+ Assert.fail("needsTaskCommit is false but commitTask was called");
+ }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) {
+ }
+
+ public void setupTask(TaskAttemptContext taskContext) {
+ }
+ };
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestPassProperties.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestPassProperties.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestPassProperties.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestPassProperties.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.junit.Test;
+
+public class TestPassProperties {
+ private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+ "/build/test/data/" + TestSequenceFileReadWrite.class.getCanonicalName();
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
+
+ private static Driver driver;
+ private static PigServer server;
+ private static String[] input;
+ private static HiveConf hiveConf;
+
+ public void Initialize() throws Exception {
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+
+ new File(TEST_WAREHOUSE_DIR).mkdirs();
+
+ int numRows = 3;
+ input = new String[numRows];
+ for (int i = 0; i < numRows; i++) {
+ String col1 = "a" + i;
+ String col2 = "b" + i;
+ input[i] = i + "," + col1 + "," + col2;
+ }
+ HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
+ server = new PigServer(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testSequenceTableWriteReadMR() throws Exception {
+ Initialize();
+ String createTable = "CREATE TABLE bad_props_table(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE";
+ driver.run("drop table bad_props_table");
+ int retCode1 = driver.run(createTable).getResponseCode();
+ assertTrue(retCode1 == 0);
+
+ boolean caughtException = false;
+ try {
+ Configuration conf = new Configuration();
+ conf.set("hive.metastore.uris", "thrift://no.such.machine:10888");
+ conf.set("hive.metastore.local", "false");
+ Job job = new Job(conf, "Write-hcat-seq-table");
+ job.setJarByClass(TestSequenceFileReadWrite.class);
+
+ job.setMapperClass(Map.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(DefaultHCatRecord.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, INPUT_FILE_NAME);
+
+ HCatOutputFormat.setOutput(job, OutputJobInfo.create(
+ MetaStoreUtils.DEFAULT_DATABASE_NAME, "bad_props_table", null));
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ HCatOutputFormat.setSchema(job, getSchema());
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ new FileOutputCommitterContainer(job, null).cleanupJob(job);
+ } catch (Exception e) {
+ caughtException = true;
+ assertTrue(e.getMessage().contains(
+ "Could not connect to meta store using any of the URIs provided"));
+ }
+ assertTrue(caughtException);
+ }
+
+ public static class Map extends Mapper<LongWritable, Text, NullWritable, DefaultHCatRecord> {
+
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String[] cols = value.toString().split(",");
+ DefaultHCatRecord record = new DefaultHCatRecord(3);
+ record.set(0, Integer.parseInt(cols[0]));
+ record.set(1, cols[1]);
+ record.set(2, cols[2]);
+ context.write(NullWritable.get(), record);
+ }
+ }
+
+ private HCatSchema getSchema() throws HCatException {
+ HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ schema.append(new HCatFieldSchema("a0", HCatFieldSchema.Type.INT,
+ ""));
+ schema.append(new HCatFieldSchema("a1",
+ HCatFieldSchema.Type.STRING, ""));
+ schema.append(new HCatFieldSchema("a2",
+ HCatFieldSchema.Type.STRING, ""));
+ return schema;
+ }
+
+
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+public class TestSequenceFileReadWrite extends TestCase {
+ private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+ "/build/test/data/" + TestSequenceFileReadWrite.class.getCanonicalName();
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
+
+ private static Driver driver;
+ private static PigServer server;
+ private static String[] input;
+ private static HiveConf hiveConf;
+
+ public void Initialize() throws Exception {
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+
+ new File(TEST_WAREHOUSE_DIR).mkdirs();
+
+ int numRows = 3;
+ input = new String[numRows];
+ for (int i = 0; i < numRows; i++) {
+ String col1 = "a" + i;
+ String col2 = "b" + i;
+ input[i] = i + "," + col1 + "," + col2;
+ }
+ HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
+ server = new PigServer(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testSequenceTableWriteRead() throws Exception {
+ Initialize();
+ String createTable = "CREATE TABLE demo_table(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE";
+ driver.run("drop table demo_table");
+ int retCode1 = driver.run(createTable).getResponseCode();
+ assertTrue(retCode1 == 0);
+
+ server.setBatchOn();
+ server.registerQuery("A = load '"
+ + INPUT_FILE_NAME
+ + "' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);");
+ server.registerQuery("store A into 'demo_table' using org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ server.registerQuery("B = load 'demo_table' using org.apache.hcatalog.pig.HCatLoader();");
+ Iterator<Tuple> XIter = server.openIterator("B");
+ int numTuplesRead = 0;
+ while (XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertEquals(3, t.size());
+ assertEquals(t.get(0).toString(), "" + numTuplesRead);
+ assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+ assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+ numTuplesRead++;
+ }
+ assertEquals(input.length, numTuplesRead);
+ }
+
+ @Test
+ public void testTextTableWriteRead() throws Exception {
+ Initialize();
+ String createTable = "CREATE TABLE demo_table_1(a0 int, a1 String, a2 String) STORED AS TEXTFILE";
+ driver.run("drop table demo_table_1");
+ int retCode1 = driver.run(createTable).getResponseCode();
+ assertTrue(retCode1 == 0);
+
+ server.setBatchOn();
+ server.registerQuery("A = load '"
+ + INPUT_FILE_NAME
+ + "' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);");
+ server.registerQuery("store A into 'demo_table_1' using org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ server.registerQuery("B = load 'demo_table_1' using org.apache.hcatalog.pig.HCatLoader();");
+ Iterator<Tuple> XIter = server.openIterator("B");
+ int numTuplesRead = 0;
+ while (XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertEquals(3, t.size());
+ assertEquals(t.get(0).toString(), "" + numTuplesRead);
+ assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+ assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+ numTuplesRead++;
+ }
+ assertEquals(input.length, numTuplesRead);
+ }
+
+ @Test
+ public void testSequenceTableWriteReadMR() throws Exception {
+ Initialize();
+ String createTable = "CREATE TABLE demo_table_2(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE";
+ driver.run("drop table demo_table_2");
+ int retCode1 = driver.run(createTable).getResponseCode();
+ assertTrue(retCode1 == 0);
+
+ Configuration conf = new Configuration();
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ HCatUtil.serialize(hiveConf.getAllProperties()));
+ Job job = new Job(conf, "Write-hcat-seq-table");
+ job.setJarByClass(TestSequenceFileReadWrite.class);
+
+ job.setMapperClass(Map.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(DefaultHCatRecord.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, INPUT_FILE_NAME);
+
+ HCatOutputFormat.setOutput(job, OutputJobInfo.create(
+ MetaStoreUtils.DEFAULT_DATABASE_NAME, "demo_table_2", null));
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ HCatOutputFormat.setSchema(job, getSchema());
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ if (!HCatUtil.isHadoop23()) {
+ new FileOutputCommitterContainer(job, null).commitJob(job);
+ }
+ assertTrue(job.isSuccessful());
+
+ server.setBatchOn();
+ server.registerQuery("C = load 'default.demo_table_2' using org.apache.hcatalog.pig.HCatLoader();");
+ server.executeBatch();
+ Iterator<Tuple> XIter = server.openIterator("C");
+ int numTuplesRead = 0;
+ while (XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertEquals(3, t.size());
+ assertEquals(t.get(0).toString(), "" + numTuplesRead);
+ assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+ assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+ numTuplesRead++;
+ }
+ assertEquals(input.length, numTuplesRead);
+ }
+
+ @Test
+ public void testTextTableWriteReadMR() throws Exception {
+ Initialize();
+ String createTable = "CREATE TABLE demo_table_3(a0 int, a1 String, a2 String) STORED AS TEXTFILE";
+ driver.run("drop table demo_table_3");
+ int retCode1 = driver.run(createTable).getResponseCode();
+ assertTrue(retCode1 == 0);
+
+ Configuration conf = new Configuration();
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ HCatUtil.serialize(hiveConf.getAllProperties()));
+ Job job = new Job(conf, "Write-hcat-text-table");
+ job.setJarByClass(TestSequenceFileReadWrite.class);
+
+ job.setMapperClass(Map.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(DefaultHCatRecord.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setNumReduceTasks(0);
+ TextInputFormat.setInputPaths(job, INPUT_FILE_NAME);
+
+ HCatOutputFormat.setOutput(job, OutputJobInfo.create(
+ MetaStoreUtils.DEFAULT_DATABASE_NAME, "demo_table_3", null));
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ HCatOutputFormat.setSchema(job, getSchema());
+ assertTrue(job.waitForCompletion(true));
+ if (!HCatUtil.isHadoop23()) {
+ new FileOutputCommitterContainer(job, null).commitJob(job);
+ }
+ assertTrue(job.isSuccessful());
+
+ server.setBatchOn();
+ server.registerQuery("D = load 'default.demo_table_3' using org.apache.hcatalog.pig.HCatLoader();");
+ server.executeBatch();
+ Iterator<Tuple> XIter = server.openIterator("D");
+ int numTuplesRead = 0;
+ while (XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertEquals(3, t.size());
+ assertEquals(t.get(0).toString(), "" + numTuplesRead);
+ assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+ assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+ numTuplesRead++;
+ }
+ assertEquals(input.length, numTuplesRead);
+ }
+
+
+ public static class Map extends Mapper<LongWritable, Text, NullWritable, DefaultHCatRecord> {
+
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String[] cols = value.toString().split(",");
+ DefaultHCatRecord record = new DefaultHCatRecord(3);
+ record.set(0, Integer.parseInt(cols[0]));
+ record.set(1, cols[1]);
+ record.set(2, cols[2]);
+ context.write(NullWritable.get(), record);
+ }
+ }
+
+ private HCatSchema getSchema() throws HCatException {
+ HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ schema.append(new HCatFieldSchema("a0", HCatFieldSchema.Type.INT,
+ ""));
+ schema.append(new HCatFieldSchema("a1",
+ HCatFieldSchema.Type.STRING, ""));
+ schema.append(new HCatFieldSchema("a2",
+ HCatFieldSchema.Type.STRING, ""));
+ return schema;
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java.broken (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.rcfile;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatDataCheckUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+
+
+public class TestRCFileInputStorageDriver extends TestCase{
+ private static final Configuration conf = new Configuration();
+ private static final Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
+ private static final Path file = new Path(dir, "test_rcfile");
+ private final HCatHadoopShims shim = HCatHadoopShims.Instance.get();
+
+ // Generate sample records to compare against
+ private byte[][][] getRecords() throws UnsupportedEncodingException {
+ byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+ "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "hcatalog and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+ byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"),
+ "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "hcatalog and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+ return new byte[][][]{record_1, record_2};
+ }
+
+ // Write sample records to file for individual tests
+ private BytesRefArrayWritable[] initTestEnvironment() throws IOException {
+ FileSystem fs = FileSystem.getLocal(conf);
+ fs.delete(file, true);
+
+ byte [][][] records = getRecords();
+ RCFileOutputFormat.setColumnNumber(conf, 8);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, new DefaultCodec());
+
+ BytesRefArrayWritable bytes = writeBytesToFile(records[0], writer);
+ BytesRefArrayWritable bytes2 = writeBytesToFile(records[1], writer);
+
+ writer.close();
+ return new BytesRefArrayWritable[]{bytes,bytes2};
+ }
+
+ private BytesRefArrayWritable writeBytesToFile(byte[][] record, RCFile.Writer writer) throws IOException {
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);
+ for (int i = 0; i < record.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);
+ bytes.set(i, cu);
+ }
+ writer.append(bytes);
+ return bytes;
+ }
+
+ public void testConvertValueToTuple() throws IOException,InterruptedException{
+ BytesRefArrayWritable[] bytesArr = initTestEnvironment();
+
+ HCatSchema schema = buildHiveSchema();
+ RCFileInputDriver sd = new RCFileInputDriver();
+ JobContext jc = shim.createJobContext(conf, new JobID());
+ sd.setInputPath(jc, file.toString());
+ InputFormat<?,?> iF = sd.getInputFormat(null);
+ InputSplit split = iF.getSplits(jc).get(0);
+ sd.setOriginalSchema(jc, schema);
+ sd.setOutputSchema(jc, schema);
+ sd.initialize(jc, getProps());
+
+ TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID());
+ RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+ rr.initialize(split, tac);
+ HCatRecord[] tuples = getExpectedRecords();
+ for(int j=0; j < 2; j++){
+ Assert.assertTrue(rr.nextKeyValue());
+ BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+ Assert.assertEquals(bytesArr[j], w);
+ HCatRecord t = sd.convertToHCatRecord(null,w);
+ Assert.assertEquals(8, t.size());
+ Assert.assertTrue(HCatDataCheckUtil.recordsEqual(t,tuples[j]));
+ }
+ }
+
+ public void testPruning() throws IOException,InterruptedException{
+ BytesRefArrayWritable[] bytesArr = initTestEnvironment();
+
+ RCFileInputDriver sd = new RCFileInputDriver();
+ JobContext jc = shim.createJobContext(conf, new JobID());
+ sd.setInputPath(jc, file.toString());
+ InputFormat<?,?> iF = sd.getInputFormat(null);
+ InputSplit split = iF.getSplits(jc).get(0);
+ sd.setOriginalSchema(jc, buildHiveSchema());
+ sd.setOutputSchema(jc, buildPrunedSchema());
+
+ sd.initialize(jc, getProps());
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID());
+ RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+ rr.initialize(split, tac);
+ HCatRecord[] tuples = getPrunedRecords();
+ for(int j=0; j < 2; j++){
+ Assert.assertTrue(rr.nextKeyValue());
+ BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+ Assert.assertFalse(bytesArr[j].equals(w));
+ Assert.assertEquals(w.size(), 8);
+ HCatRecord t = sd.convertToHCatRecord(null,w);
+ Assert.assertEquals(5, t.size());
+ Assert.assertTrue(HCatDataCheckUtil.recordsEqual(t,tuples[j]));
+ }
+ assertFalse(rr.nextKeyValue());
+ }
+
+ public void testReorderdCols() throws IOException,InterruptedException{
+ BytesRefArrayWritable[] bytesArr = initTestEnvironment();
+
+ RCFileInputDriver sd = new RCFileInputDriver();
+ JobContext jc = shim.createJobContext(conf, new JobID());
+ sd.setInputPath(jc, file.toString());
+ InputFormat<?,?> iF = sd.getInputFormat(null);
+ InputSplit split = iF.getSplits(jc).get(0);
+ sd.setOriginalSchema(jc, buildHiveSchema());
+ sd.setOutputSchema(jc, buildReorderedSchema());
+
+ sd.initialize(jc, getProps());
+ Map<String,String> map = new HashMap<String,String>(1);
+ map.put("part1", "first-part");
+ sd.setPartitionValues(jc, map);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID());
+ RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+ rr.initialize(split, tac);
+ HCatRecord[] tuples = getReorderedCols();
+ for(int j=0; j < 2; j++){
+ Assert.assertTrue(rr.nextKeyValue());
+ BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+ Assert.assertFalse(bytesArr[j].equals(w));
+ Assert.assertEquals(w.size(), 8);
+ HCatRecord t = sd.convertToHCatRecord(null,w);
+ Assert.assertEquals(7, t.size());
+ Assert.assertTrue(HCatDataCheckUtil.recordsEqual(t,tuples[j]));
+ }
+ assertFalse(rr.nextKeyValue());
+ }
+ private HCatRecord[] getExpectedRecords(){
+ List<Object> rec_1 = new ArrayList<Object>(8);
+ Collections.addAll(rec_1, new Byte("123"),
+ new Short("456"),
+ new Integer(789),
+ new Long(1000L),
+ new Double(5.3D),
+ new String("hcatalog and hadoop"),
+ null,
+ null);
+
+ HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+ List<Object> rec_2 = new ArrayList<Object>(8);
+ Collections.addAll(rec_2, new Byte("100"),
+ new Short("200"),
+ new Integer(123),
+ new Long(1000L),
+ new Double(5.3D),
+ new String("hcatalog and hadoop"),
+ null,
+ null);
+ HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+ return new HCatRecord[]{tup_1,tup_2};
+ }
+
+ private HCatRecord[] getPrunedRecords(){
+ List<Object> rec_1 = new ArrayList<Object>(8);
+ Collections.addAll(rec_1, new Byte("123"),
+ new Integer(789),
+ new Double(5.3D),
+ new String("hcatalog and hadoop"),
+ null);
+ HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+ List<Object> rec_2 = new ArrayList<Object>(8);
+ Collections.addAll(rec_2, new Byte("100"),
+ new Integer(123),
+ new Double(5.3D),
+ new String("hcatalog and hadoop"),
+ null);
+ HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+ return new HCatRecord[]{tup_1,tup_2};
+ }
+
+ private HCatSchema buildHiveSchema() throws HCatException{
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(new FieldSchema("atinyint", "tinyint", ""),
+ new FieldSchema("asmallint", "smallint", ""),
+ new FieldSchema("aint", "int", ""),
+ new FieldSchema("along", "bigint", ""),
+ new FieldSchema("adouble", "double", ""),
+ new FieldSchema("astring", "string", ""),
+ new FieldSchema("anullint", "int", ""),
+ new FieldSchema("anullstring", "string", "")));
+ }
+
+ private HCatSchema buildPrunedSchema() throws HCatException{
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(new FieldSchema("atinyint", "tinyint", ""),
+ new FieldSchema("aint", "int", ""),
+ new FieldSchema("adouble", "double", ""),
+ new FieldSchema("astring", "string", ""),
+ new FieldSchema("anullint", "int", "")));
+ }
+
+ private HCatSchema buildReorderedSchema() throws HCatException{
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(new FieldSchema("aint", "int", ""),
+ new FieldSchema("part1", "string", ""),
+ new FieldSchema("adouble", "double", ""),
+ new FieldSchema("newCol", "tinyint", ""),
+ new FieldSchema("astring", "string", ""),
+ new FieldSchema("atinyint", "tinyint", ""),
+ new FieldSchema("anullint", "int", "")));
+ }
+
+ private HCatRecord[] getReorderedCols(){
+ List<Object> rec_1 = new ArrayList<Object>(7);
+ Collections.addAll(rec_1, new Integer(789),
+ new String("first-part"),
+ new Double(5.3D),
+ null, // new column
+ new String("hcatalog and hadoop"),
+ new Byte("123"),
+ null);
+ HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+ List<Object> rec_2 = new ArrayList<Object>(7);
+ Collections.addAll(rec_2, new Integer(123),
+ new String("first-part"),
+ new Double(5.3D),
+ null,
+ new String("hcatalog and hadoop"),
+ new Byte("100"),
+ null);
+ HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+ return new HCatRecord[]{tup_1,tup_2};
+
+ }
+ private Properties getProps(){
+ Properties props = new Properties();
+ props.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "\\N");
+ props.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ return props;
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.rcfile;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TestRCFile.
+ *
+ */
+public class TestRCFileMapReduceInputFormat extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRCFileMapReduceInputFormat.class);
+
+ private static Configuration conf = new Configuration();
+
+ private static ColumnarSerDe serDe;
+
+ private static Path file;
+
+ private static FileSystem fs;
+
+ private static Properties tbl;
+
+ static {
+ try {
+ fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
+ file = new Path(dir, "test_rcfile");
+ fs.delete(dir, true);
+ // the SerDe part is from TestLazySimpleSerDe
+ serDe = new ColumnarSerDe();
+ // Create the SerDe
+ tbl = createProperties();
+ serDe.initialize(conf, tbl);
+ } catch (Exception e) {
+ }
+ }
+
+ private static BytesRefArrayWritable patialS = new BytesRefArrayWritable();
+
+ private static byte[][] bytesArray = null;
+
+ private static BytesRefArrayWritable s = null;
+
+ static {
+ try {
+ bytesArray = new byte[][]{"123".getBytes("UTF-8"),
+ "456".getBytes("UTF-8"), "789".getBytes("UTF-8"),
+ "1000".getBytes("UTF-8"), "5.3".getBytes("UTF-8"),
+ "hive and hadoop".getBytes("UTF-8"), new byte[0],
+ "NULL".getBytes("UTF-8")};
+ s = new BytesRefArrayWritable(bytesArray.length);
+ s.set(0, new BytesRefWritable("123".getBytes("UTF-8")));
+ s.set(1, new BytesRefWritable("456".getBytes("UTF-8")));
+ s.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+ s.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+ s.set(4, new BytesRefWritable("5.3".getBytes("UTF-8")));
+ s.set(5, new BytesRefWritable("hive and hadoop".getBytes("UTF-8")));
+ s.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ s.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+
+ // partial test init
+ patialS.set(0, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(1, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+ patialS.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+ patialS.set(4, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(5, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+
+ } catch (UnsupportedEncodingException e) {
+ }
+ }
+
+
+ /** For debugging and testing. */
+ public static void main(String[] args) throws Exception {
+ int count = 10000;
+ boolean create = true;
+
+ String usage = "Usage: RCFile " + "[-count N]" + " file";
+ if (args.length == 0) {
+ LOG.error(usage);
+ System.exit(-1);
+ }
+
+ try {
+ for (int i = 0; i < args.length; ++i) { // parse command line
+ if (args[i] == null) {
+ continue;
+ } else if (args[i].equals("-count")) {
+ count = Integer.parseInt(args[++i]);
+ } else {
+ // file is required parameter
+ file = new Path(args[i]);
+ }
+ }
+
+ if (file == null) {
+ LOG.error(usage);
+ System.exit(-1);
+ }
+
+ LOG.info("count = {}", count);
+ LOG.info("create = {}", create);
+ LOG.info("file = {}", file);
+
+ // test.performanceTest();
+ LOG.info("Finished.");
+ } finally {
+ fs.close();
+ }
+ }
+
+ private static Properties createProperties() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(serdeConstants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns",
+ "abyte,ashort,aint,along,adouble,astring,anullint,anullstring");
+ tbl.setProperty("columns.types",
+ "tinyint:smallint:int:bigint:double:string:int:string");
+ tbl.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
+ return tbl;
+ }
+
+
+ public void testSynAndSplit() throws IOException, InterruptedException {
+ splitBeforeSync();
+ splitRightBeforeSync();
+ splitInMiddleOfSync();
+ splitRightAfterSync();
+ splitAfterSync();
+ }
+
+ private void splitBeforeSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(600, 1000, 2, 17684, null);
+ }
+
+ private void splitRightBeforeSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 17750, null);
+ }
+
+ private void splitInMiddleOfSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 17760, null);
+
+ }
+
+ private void splitRightAfterSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 17770, null);
+ }
+
+ private void splitAfterSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 19950, null);
+ }
+
+ private void writeThenReadByRecordReader(int intervalRecordCount,
+ int writeCount, int splitNumber, long maxSplitSize, CompressionCodec codec)
+ throws IOException, InterruptedException {
+ Path testDir = new Path(System.getProperty("test.data.dir", ".")
+ + "/mapred/testsmallfirstsplit");
+ Path testFile = new Path(testDir, "test_rcfile");
+ fs.delete(testFile, true);
+ Configuration cloneConf = new Configuration(conf);
+ RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
+ cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
+
+ RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
+
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
+ for (int i = 0; i < bytesArray.length; i++) {
+ BytesRefWritable cu = null;
+ cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
+ bytes.set(i, cu);
+ }
+ for (int i = 0; i < writeCount; i++) {
+ writer.append(bytes);
+ }
+ writer.close();
+
+ RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable> inputFormat = new RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable>();
+ Configuration jonconf = new Configuration(cloneConf);
+ jonconf.set("mapred.input.dir", testDir.toString());
+ JobContext context = new Job(jonconf);
+ context.getConfiguration().setLong("mapred.max.split.size", maxSplitSize);
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber);
+ int readCount = 0;
+ for (int i = 0; i < splits.size(); i++) {
+ TaskAttemptContext tac = HCatHadoopShims.Instance.get().createTaskAttemptContext(jonconf, new TaskAttemptID());
+ RecordReader<LongWritable, BytesRefArrayWritable> rr = inputFormat.createRecordReader(splits.get(i), tac);
+ rr.initialize(splits.get(i), tac);
+ while (rr.nextKeyValue()) {
+ readCount++;
+ }
+ }
+ assertEquals("readCount should be equal to writeCount", readCount, writeCount);
+ }
+
+}
+
+
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java.broken (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.rcfile;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+
+public class TestRCFileOutputStorageDriver extends TestCase {
+
+ public void testConversion() throws IOException {
+ Configuration conf = new Configuration();
+ JobContext jc = HCatHadoopShims.Instance.get().createJobContext(conf, new JobID());
+ String jobString = HCatUtil.serialize(OutputJobInfo.create(null,null,null));
+ jc.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,jobString);
+
+ HCatSchema schema = buildHiveSchema();
+ HCatInputStorageDriver isd = new RCFileInputDriver();
+
+ isd.setOriginalSchema(jc, schema);
+ isd.setOutputSchema(jc, schema);
+ isd.initialize(jc, new Properties());
+
+ byte[][] byteArray = buildBytesArray();
+
+ BytesRefArrayWritable bytesWritable = new BytesRefArrayWritable(byteArray.length);
+ for (int i = 0; i < byteArray.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(byteArray[i], 0, byteArray[i].length);
+ bytesWritable.set(i, cu);
+ }
+
+ //Convert byte array to HCatRecord using isd, convert hcatrecord back to byte array
+ //using osd, compare the two arrays
+ HCatRecord record = isd.convertToHCatRecord(null, bytesWritable);
+
+ HCatOutputStorageDriver osd = new RCFileOutputDriver();
+
+ osd.setSchema(jc, schema);
+ osd.initialize(jc, new Properties());
+
+ BytesRefArrayWritable bytesWritableOutput = (BytesRefArrayWritable) osd.convertValue(record);
+
+ assertTrue(bytesWritableOutput.compareTo(bytesWritable) == 0);
+ }
+
+ private byte[][] buildBytesArray() throws UnsupportedEncodingException {
+ byte[][] bytes = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+ "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "hcat and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8") };
+ return bytes;
+ }
+
+ private HCatSchema buildHiveSchema() throws HCatException{
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>(8);
+ fields.add(new FieldSchema("atinyint", "tinyint", ""));
+ fields.add(new FieldSchema("asmallint", "smallint", ""));
+ fields.add(new FieldSchema("aint", "int", ""));
+ fields.add(new FieldSchema("along", "bigint", ""));
+ fields.add(new FieldSchema("adouble", "double", ""));
+ fields.add(new FieldSchema("astring", "string", ""));
+ fields.add(new FieldSchema("anullint", "int", ""));
+ fields.add(new FieldSchema("anullstring", "string", ""));
+
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/security/TestHdfsAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/security/TestHdfsAuthorizationProvider.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/security/TestHdfsAuthorizationProvider.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/security/TestHdfsAuthorizationProvider.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.security;
+
+import static org.apache.hcatalog.HcatTestUtils.perm300;
+import static org.apache.hcatalog.HcatTestUtils.perm500;
+import static org.apache.hcatalog.HcatTestUtils.perm555;
+import static org.apache.hcatalog.HcatTestUtils.perm700;
+import static org.apache.hcatalog.HcatTestUtils.perm755;
+
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHdfsAuthorizationProvider {
+
+ protected HCatDriver hcatDriver;
+ protected HiveMetaStoreClient msc;
+ protected HiveConf conf;
+ protected String whDir;
+ protected Path whPath;
+ protected FileSystem whFs;
+ protected Warehouse wh;
+ protected Hive hive;
+
+ @Before
+ public void setUp() throws Exception {
+
+ conf = new HiveConf(this.getClass());
+ conf.set(ConfVars.PREEXECHOOKS.varname, "");
+ conf.set(ConfVars.POSTEXECHOOKS.varname, "");
+ conf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+ conf.set("hive.metastore.local", "true");
+ conf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED, true);
+ conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ StorageDelegationAuthorizationProvider.class.getCanonicalName());
+ conf.set("fs.pfile.impl", "org.apache.hadoop.fs.ProxyLocalFileSystem");
+
+ whDir = System.getProperty("test.warehouse.dir", "/tmp/testhdfsauthorization_wh");
+ conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, whDir);
+
+ UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+ String username = ShimLoader.getHadoopShims().getShortUserName(ugi);
+
+ whPath = new Path(whDir);
+ whFs = whPath.getFileSystem(conf);
+
+ wh = new Warehouse(conf);
+ hive = Hive.get(conf);
+
+ //clean up mess in HMS
+ HcatTestUtils.cleanupHMS(hive, wh, perm700);
+
+ whFs.delete(whPath, true);
+ whFs.mkdirs(whPath, perm755);
+
+ SessionState.start(new CliSessionState(conf));
+ hcatDriver = new HCatDriver();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ whFs.close();
+ hcatDriver.close();
+ Hive.closeCurrent();
+ }
+
+ public Path getDbPath(String dbName) throws MetaException, HiveException {
+ return HcatTestUtils.getDbPath(hive, wh, dbName);
+ }
+
+ public Path getTablePath(String dbName, String tableName) throws HiveException {
+ Table table = hive.getTable(dbName, tableName);
+ return table.getPath();
+ }
+
+ public Path getPartPath(String partName, String dbName, String tableName) throws HiveException {
+ return new Path(getTablePath(dbName, tableName), partName);
+ }
+
+ /** Execute the query expecting success*/
+ public void exec(String format, Object... args) throws Exception {
+ String command = String.format(format, args);
+ CommandProcessorResponse resp = hcatDriver.run(command);
+ Assert.assertEquals(resp.getErrorMessage(), 0, resp.getResponseCode());
+ Assert.assertEquals(resp.getErrorMessage(), null, resp.getErrorMessage());
+ }
+
+ /** Execute the query expecting it to fail with AuthorizationException */
+ public void execFail(String format, Object... args) throws Exception {
+ String command = String.format(format, args);
+ CommandProcessorResponse resp = hcatDriver.run(command);
+ Assert.assertNotSame(resp.getErrorMessage(), 0, resp.getResponseCode());
+ Assert.assertTrue((resp.getResponseCode() == 40000) || (resp.getResponseCode() == 403));
+ if (resp.getErrorMessage() != null) {
+ Assert.assertTrue(resp.getErrorMessage().contains("org.apache.hadoop.security.AccessControlException"));
+ }
+ }
+
+
+ /**
+ * Tests whether the warehouse directory is writable by the current user (as defined by Hadoop)
+ */
+ @Test
+ public void testWarehouseIsWritable() throws Exception {
+ Path top = new Path(whPath, "_foobarbaz12_");
+ try {
+ whFs.mkdirs(top);
+ } finally {
+ whFs.delete(top, true);
+ }
+ }
+
+ @Test
+ public void testShowDatabases() throws Exception {
+ exec("CREATE DATABASE doo");
+ exec("SHOW DATABASES");
+
+ whFs.setPermission(whPath, perm300); //revoke r
+ execFail("SHOW DATABASES");
+ }
+
+ @Test
+ public void testDatabaseOps() throws Exception {
+ exec("SHOW TABLES");
+ exec("SHOW TABLE EXTENDED LIKE foo1");
+
+ whFs.setPermission(whPath, perm700);
+ exec("CREATE DATABASE doo");
+ exec("DESCRIBE DATABASE doo");
+ exec("USE doo");
+ exec("SHOW TABLES");
+ exec("SHOW TABLE EXTENDED LIKE foo1");
+ exec("DROP DATABASE doo");
+
+ //custom location
+ Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");
+ whFs.mkdirs(dbPath, perm700);
+ exec("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());
+ exec("DESCRIBE DATABASE doo2", dbPath.toUri());
+ exec("USE doo2");
+ exec("SHOW TABLES");
+ exec("SHOW TABLE EXTENDED LIKE foo1");
+ exec("DROP DATABASE doo2", dbPath.toUri());
+
+ //custom non-existing location
+ exec("CREATE DATABASE doo3 LOCATION '%s/subpath'", dbPath.toUri());
+ }
+
+ @Test
+ public void testCreateDatabaseFail1() throws Exception {
+ whFs.setPermission(whPath, perm500);
+ execFail("CREATE DATABASE doo"); //in the default location
+
+ whFs.setPermission(whPath, perm555);
+ execFail("CREATE DATABASE doo2");
+ }
+
+ @Test
+ public void testCreateDatabaseFail2() throws Exception {
+ //custom location
+ Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");
+
+ whFs.mkdirs(dbPath, perm700);
+ whFs.setPermission(dbPath, perm500);
+ execFail("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());
+ }
+
+ @Test
+ public void testDropDatabaseFail1() throws Exception {
+ whFs.setPermission(whPath, perm700);
+ exec("CREATE DATABASE doo"); //in the default location
+
+ whFs.setPermission(getDbPath("doo"), perm500); //revoke write
+ execFail("DROP DATABASE doo");
+ }
+
+ @Test
+ public void testDropDatabaseFail2() throws Exception {
+ //custom location
+ Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");
+
+ whFs.mkdirs(dbPath, perm700);
+ exec("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());
+
+ whFs.setPermission(dbPath, perm500);
+ execFail("DROP DATABASE doo2");
+ }
+
+ @Test
+ public void testDescSwitchDatabaseFail() throws Exception {
+ whFs.setPermission(whPath, perm700);
+ exec("CREATE DATABASE doo");
+ whFs.setPermission(getDbPath("doo"), perm300); //revoke read
+ execFail("DESCRIBE DATABASE doo");
+ execFail("USE doo");
+
+ //custom location
+ Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");
+ whFs.mkdirs(dbPath, perm700);
+ exec("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());
+ whFs.mkdirs(dbPath, perm300); //revoke read
+ execFail("DESCRIBE DATABASE doo2", dbPath.toUri());
+ execFail("USE doo2");
+ }
+
+ @Test
+ public void testShowTablesFail() throws Exception {
+ whFs.setPermission(whPath, perm700);
+ exec("CREATE DATABASE doo");
+ exec("USE doo");
+ whFs.setPermission(getDbPath("doo"), perm300); //revoke read
+ execFail("SHOW TABLES");
+ execFail("SHOW TABLE EXTENDED LIKE foo1");
+ }
+
+ @Test
+ public void testTableOps() throws Exception {
+ //default db
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ exec("DESCRIBE foo1");
+ exec("DROP TABLE foo1");
+
+ //default db custom location
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm700);
+ exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ exec("DESCRIBE foo2");
+ exec("DROP TABLE foo2");
+
+ //default db custom non existing location
+ exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s/subpath'", tablePath);
+ exec("DESCRIBE foo3");
+ exec("DROP TABLE foo3");
+
+ //non default db
+ exec("CREATE DATABASE doo");
+ exec("USE doo");
+
+ exec("CREATE TABLE foo4 (foo INT) STORED AS RCFILE");
+ exec("DESCRIBE foo4");
+ exec("DROP TABLE foo4");
+
+ //non-default db custom location
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm700);
+ exec("CREATE EXTERNAL TABLE foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ exec("DESCRIBE foo5");
+ exec("DROP TABLE foo5");
+
+ //non-default db custom non existing location
+ exec("CREATE EXTERNAL TABLE foo6 (foo INT) STORED AS RCFILE LOCATION '%s/subpath'", tablePath);
+ exec("DESCRIBE foo6");
+ exec("DROP TABLE foo6");
+
+ exec("DROP TABLE IF EXISTS foo_non_exists");
+
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ exec("DESCRIBE EXTENDED foo1");
+ exec("DESCRIBE FORMATTED foo1");
+ exec("DESCRIBE foo1.foo");
+
+ //deep non-existing path for the table
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm700);
+ exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s/a/a/a/'", tablePath);
+ }
+
+ @Test
+ public void testCreateTableFail1() throws Exception {
+ //default db
+ whFs.mkdirs(whPath, perm500); //revoke w
+ execFail("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ }
+
+ @Test
+ public void testCreateTableFail2() throws Exception {
+ //default db custom location
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm500);
+ execFail("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+
+ //default db custom non existing location
+ execFail("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s/subpath'", tablePath);
+ }
+
+ @Test
+ public void testCreateTableFail3() throws Exception {
+ //non default db
+ exec("CREATE DATABASE doo");
+ whFs.setPermission(getDbPath("doo"), perm500);
+
+ execFail("CREATE TABLE doo.foo4 (foo INT) STORED AS RCFILE");
+
+ //non-default db custom location, permission to write to tablePath, but not on db path
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm700);
+ exec("USE doo");
+ execFail("CREATE EXTERNAL TABLE foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ }
+
+ @Test
+ public void testCreateTableFail4() throws Exception {
+ //non default db
+ exec("CREATE DATABASE doo");
+
+ //non-default db custom location
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm500);
+ execFail("CREATE EXTERNAL TABLE doo.foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+
+ //non-default db custom non existing location
+ execFail("CREATE EXTERNAL TABLE doo.foo6 (foo INT) STORED AS RCFILE LOCATION '%s/a/a/a/'", tablePath);
+ }
+
+ @Test
+ public void testDropTableFail1() throws Exception {
+ //default db
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ whFs.mkdirs(getTablePath("default", "foo1"), perm500); //revoke w
+ execFail("DROP TABLE foo1");
+ }
+
+ @Test
+ public void testDropTableFail2() throws Exception {
+ //default db custom location
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ whFs.mkdirs(tablePath, perm500);
+ execFail("DROP TABLE foo2");
+ }
+
+ @Test
+ public void testDropTableFail4() throws Exception {
+ //non default db
+ exec("CREATE DATABASE doo");
+
+ //non-default db custom location
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+
+ exec("CREATE EXTERNAL TABLE doo.foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ whFs.mkdirs(tablePath, perm500);
+ exec("USE doo"); //There is no DROP TABLE doo.foo5 support in Hive
+ execFail("DROP TABLE foo5");
+ }
+
+ @Test
+ public void testDescTableFail() throws Exception {
+ //default db
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ whFs.mkdirs(getTablePath("default", "foo1"), perm300); //revoke read
+ execFail("DESCRIBE foo1");
+
+ //default db custom location
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm700);
+ exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ whFs.mkdirs(tablePath, perm300); //revoke read
+ execFail("DESCRIBE foo2");
+ }
+
+ @Test
+ public void testAlterTableRename() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ exec("ALTER TABLE foo1 RENAME TO foo2");
+
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ exec("ALTER TABLE foo3 RENAME TO foo4");
+ }
+
+ @Test
+ public void testAlterTableRenameFail() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ whFs.mkdirs(getTablePath("default", "foo1"), perm500); //revoke write
+ execFail("ALTER TABLE foo1 RENAME TO foo2");
+
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
+ whFs.mkdirs(tablePath, perm500); //revoke write
+ execFail("ALTER TABLE foo3 RENAME TO foo4");
+ }
+
+ @Test
+ public void testAlterTableRelocate() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ exec("ALTER TABLE foo1 SET LOCATION '%s'", tablePath.makeQualified(whFs));
+
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
+ exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'",
+ tablePath.makeQualified(whFs));
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
+ exec("ALTER TABLE foo3 SET LOCATION '%s'", tablePath.makeQualified(whFs));
+ }
+
+ @Test
+ public void testAlterTableRelocateFail() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
+ Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
+ whFs.mkdirs(tablePath, perm500); //revoke write
+ execFail("ALTER TABLE foo1 SET LOCATION '%s'", tablePath.makeQualified(whFs));
+
+ //dont have access to new table loc
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
+ exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'",
+ tablePath.makeQualified(whFs));
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
+ whFs.mkdirs(tablePath, perm500); //revoke write
+ execFail("ALTER TABLE foo3 SET LOCATION '%s'", tablePath.makeQualified(whFs));
+
+ //have access to new table loc, but not old table loc
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable3");
+ exec("CREATE EXTERNAL TABLE foo4 (foo INT) STORED AS RCFILE LOCATION '%s'",
+ tablePath.makeQualified(whFs));
+ whFs.mkdirs(tablePath, perm500); //revoke write
+ tablePath = new Path(whPath, new Random().nextInt() + "/mytable3");
+ execFail("ALTER TABLE foo4 SET LOCATION '%s'", tablePath.makeQualified(whFs));
+ }
+
+ @Test
+ public void testAlterTable() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
+ exec("ALTER TABLE foo1 SET TBLPROPERTIES ('foo'='bar')");
+ exec("ALTER TABLE foo1 SET SERDEPROPERTIES ('foo'='bar')");
+ exec("ALTER TABLE foo1 ADD COLUMNS (foo2 INT)");
+ }
+
+ @Test
+ public void testAddDropPartition() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10')");
+ exec("ALTER TABLE foo1 ADD IF NOT EXISTS PARTITION (b='2010-10-10')");
+ String relPath = new Random().nextInt() + "/mypart";
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-11') LOCATION '%s'", relPath);
+
+ exec("ALTER TABLE foo1 PARTITION (b='2010-10-10') SET FILEFORMAT RCFILE");
+
+ exec("ALTER TABLE foo1 PARTITION (b='2010-10-10') SET FILEFORMAT INPUTFORMAT "
+ + "'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT "
+ + "'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver "
+ + "'mydriver' outputdriver 'yourdriver'");
+
+ exec("ALTER TABLE foo1 DROP PARTITION (b='2010-10-10')");
+ exec("ALTER TABLE foo1 DROP PARTITION (b='2010-10-11')");
+ }
+
+ @Test
+ public void testAddPartitionFail1() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
+ whFs.mkdirs(getTablePath("default", "foo1"), perm500);
+ execFail("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10')");
+ }
+
+ @Test
+ public void testAddPartitionFail2() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
+ String relPath = new Random().nextInt() + "/mypart";
+ Path partPath = new Path(getTablePath("default", "foo1"), relPath);
+ whFs.mkdirs(partPath, perm500);
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10') LOCATION '%s'", partPath);
+ }
+
+ @Test
+ public void testDropPartitionFail1() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10')");
+ whFs.mkdirs(getPartPath("b=2010-10-10", "default", "foo1"), perm500);
+ execFail("ALTER TABLE foo1 DROP PARTITION (b='2010-10-10')");
+ }
+
+ @Test
+ public void testDropPartitionFail2() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
+ String relPath = new Random().nextInt() + "/mypart";
+ Path partPath = new Path(getTablePath("default", "foo1"), relPath);
+ whFs.mkdirs(partPath, perm700);
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10') LOCATION '%s'", partPath);
+ whFs.mkdirs(partPath, perm500); //revoke write
+ execFail("ALTER TABLE foo1 DROP PARTITION (b='2010-10-10')");
+ }
+
+ @Test
+ public void testAlterTableFail() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (boo STRING) STORED AS TEXTFILE");
+ whFs.mkdirs(getTablePath("default", "foo1"), perm500); //revoke write
+ execFail("ALTER TABLE foo1 SET TBLPROPERTIES ('foo'='bar')");
+ execFail("ALTER TABLE foo1 SET SERDEPROPERTIES ('foo'='bar')");
+ execFail("ALTER TABLE foo1 ADD COLUMNS (foo2 INT)");
+ }
+
+ @Test
+ public void testShowTables() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (boo STRING) STORED AS TEXTFILE");
+ exec("SHOW PARTITIONS foo1");
+
+ whFs.mkdirs(getTablePath("default", "foo1"), perm300); //revoke read
+ execFail("SHOW PARTITIONS foo1");
+ }
+
+ @Test
+ public void testAlterTablePartRename() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");
+ Path loc = new Path(whPath, new Random().nextInt() + "/mypart");
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16') LOCATION '%s'", loc);
+ exec("ALTER TABLE foo1 PARTITION (b='2010-10-16') RENAME TO PARTITION (b='2010-10-17')");
+ }
+
+ @Test
+ public void testAlterTablePartRenameFail() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");
+ Path loc = new Path(whPath, new Random().nextInt() + "/mypart");
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16') LOCATION '%s'", loc);
+ whFs.setPermission(loc, perm500); //revoke w
+ execFail("ALTER TABLE foo1 PARTITION (b='2010-10-16') RENAME TO PARTITION (b='2010-10-17')");
+ }
+
+ @Test
+ public void testAlterTablePartRelocate() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16')");
+ Path partPath = new Path(whPath, new Random().nextInt() + "/mypart");
+ exec("ALTER TABLE foo1 PARTITION (b='2010-10-16') SET LOCATION '%s'", partPath.makeQualified(whFs));
+ }
+
+ @Test
+ public void testAlterTablePartRelocateFail() throws Exception {
+ exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");
+
+ Path oldLoc = new Path(whPath, new Random().nextInt() + "/mypart");
+ Path newLoc = new Path(whPath, new Random().nextInt() + "/mypart2");
+
+ exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16') LOCATION '%s'", oldLoc);
+ whFs.mkdirs(oldLoc, perm500);
+ execFail("ALTER TABLE foo1 PARTITION (b='2010-10-16') SET LOCATION '%s'", newLoc.makeQualified(whFs));
+ whFs.mkdirs(oldLoc, perm700);
+ whFs.mkdirs(newLoc, perm500);
+ execFail("ALTER TABLE foo1 PARTITION (b='2010-10-16') SET LOCATION '%s'", newLoc.makeQualified(whFs));
+ }
+
+}