You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/07 16:34:34 UTC
[1/3] SQOOP-931: Integrate HCatalog with Sqoop
Updated Branches:
refs/heads/trunk b07906a2a -> 5e88d43b5
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
new file mode 100644
index 0000000..77bafcc
--- /dev/null
+++ b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
+import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
+import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.junit.Before;
+
+import com.cloudera.sqoop.testutil.ExportJobTestCase;
+
+/**
+ * Test that we can export HCatalog tables into databases.
+ */
+public class HCatalogExportTest extends ExportJobTestCase {
+ private static final Log LOG =
+ LogFactory.getLog(HCatalogExportTest.class);
+ private HCatalogTestUtils utils = HCatalogTestUtils.instance();
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+ try {
+ utils.initUtils();
+ } catch (Exception e) {
+ throw new RuntimeException("Error initializing HCatTestUtilis", e);
+ }
+ }
+ /**
+ * @return an argv for the CodeGenTool to use when creating tables to export.
+ */
+ protected String[] getCodeGenArgv(String... extraArgs) {
+ List<String> codeGenArgv = new ArrayList<String>();
+
+ if (null != extraArgs) {
+ for (String arg : extraArgs) {
+ codeGenArgv.add(arg);
+ }
+ }
+
+ codeGenArgv.add("--table");
+ codeGenArgv.add(getTableName());
+ codeGenArgv.add("--connect");
+ codeGenArgv.add(getConnectString());
+ codeGenArgv.add("--hcatalog-table");
+ codeGenArgv.add(getTableName());
+
+ return codeGenArgv.toArray(new String[0]);
+ }
+
+ /**
+ * Verify that for the max and min values of the 'id' column, the values for a
+ * given column meet the expected values.
+ */
+ protected void assertColMinAndMax(String colName, ColumnGenerator generator)
+ throws SQLException {
+ Connection conn = getConnection();
+ int minId = getMinRowId(conn);
+ int maxId = getMaxRowId(conn);
+ String table = getTableName();
+ LOG.info("Checking min/max for column " + colName + " with type "
+ + SqoopHCatUtilities.sqlTypeString(generator.getSqlType()));
+
+ Object expectedMin = generator.getDBValue(minId);
+ Object expectedMax = generator.getDBValue(maxId);
+
+ utils.assertSqlColValForRowId(conn, table, minId, colName, expectedMin);
+ utils.assertSqlColValForRowId(conn, table, maxId, colName, expectedMax);
+ }
+
+ private void runHCatExport(List<String> addlArgsArray,
+ final int totalRecords, String table,
+ ColumnGenerator[] cols) throws Exception {
+ utils.createHCatTable(CreateMode.CREATE_AND_LOAD,
+ totalRecords, table, cols);
+ utils.createSqlTable(getConnection(), true, totalRecords, table, cols);
+ Map<String, String> addlArgsMap = utils.getAddlTestArgs();
+ addlArgsArray.add("--verbose");
+ addlArgsArray.add("-m");
+ addlArgsArray.add("1");
+ addlArgsArray.add("--hcatalog-table");
+ addlArgsArray.add(table);
+ String[] argv = {};
+
+ if (addlArgsMap.containsKey("-libjars")) {
+ argv = new String[2];
+ argv[0] = "-libjars";
+ argv[1] = addlArgsMap.get("-libjars");
+ }
+ for (String k : addlArgsMap.keySet()) {
+ if (!k.equals("-libjars")) {
+ addlArgsArray.add(k);
+ addlArgsArray.add(addlArgsMap.get(k));
+ }
+ }
+ String[] exportArgs = getArgv(true, 10, 10, newStrArray(argv,
+ addlArgsArray.toArray(new String[0])));
+ LOG.debug("Export args = " + Arrays.toString(exportArgs));
+ SqoopHCatUtilities.instance().setConfigured(false);
+ runExport(exportArgs);
+ verifyExport(totalRecords);
+ for (int i = 0; i < cols.length; i++) {
+ assertColMinAndMax(HCatalogTestUtils.forIdx(i), cols[i]);
+ }
+ }
+
+ public void testIntTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "boolean", Types.BOOLEAN, HCatFieldSchema.Type.BOOLEAN,
+ Boolean.TRUE, Boolean.TRUE, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "tinyint", Types.INTEGER, HCatFieldSchema.Type.INT, 10,
+ 10, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "smallint", Types.INTEGER, HCatFieldSchema.Type.INT, 100,
+ 100, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(3),
+ "int", Types.INTEGER, HCatFieldSchema.Type.INT, 1000,
+ 1000, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(4),
+ "bigint", Types.BIGINT, HCatFieldSchema.Type.BIGINT, 10000L,
+ 10000L, KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testFloatTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "float", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 10.0F,
+ 10.F, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "real", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 20.0F,
+ 20.0F, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "double", Types.DOUBLE, HCatFieldSchema.Type.DOUBLE, 30.0D,
+ 30.0D, KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testNumberTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "numeric(18,2)", Types.NUMERIC, HCatFieldSchema.Type.STRING, "1000",
+ new BigDecimal("1000"), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "decimal(18,2)", Types.DECIMAL, HCatFieldSchema.Type.STRING, "2000",
+ new BigDecimal("2000"), KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testDateTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "date", Types.DATE, HCatFieldSchema.Type.STRING, "2013-12-31",
+ new Date(113, 11, 31), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "time", Types.TIME, HCatFieldSchema.Type.STRING, "10:11:12",
+ new Time(10, 11, 12), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.STRING,
+ "2013-12-31 10:11:12", new Timestamp(113, 11, 31, 10, 11, 12, 0),
+ KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testDateTypesToBigInt() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ long offset = TimeZone.getDefault().getRawOffset();
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "date", Types.DATE, HCatFieldSchema.Type.BIGINT, 0 - offset,
+ new Date(70, 0, 1), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "time", Types.TIME, HCatFieldSchema.Type.BIGINT, 36672000L - offset,
+ new Time(10, 11, 12), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.BIGINT,
+ 36672000L - offset, new Timestamp(70, 0, 1, 10, 11, 12, 0),
+ KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--map-column-hive");
+ addlArgsArray.add("COL0=bigint,COL1=bigint,COL2=bigint");
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testStringTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "char(10)", Types.CHAR, HCatFieldSchema.Type.STRING, "string to test",
+ "string to test", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "longvarchar", Types.LONGVARCHAR, HCatFieldSchema.Type.STRING,
+ "string to test", "string to test", KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+
+ public void testBinaryTypes() throws Exception {
+ ByteBuffer bb = ByteBuffer.wrap(new byte[] { 0, 1, 2 });
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "binary(10)", Types.BINARY, HCatFieldSchema.Type.BINARY,
+ bb.array(), bb.array(), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varbinary(10)", Types.BINARY, HCatFieldSchema.Type.BINARY,
+ bb.array(), bb.array(), KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testColumnProjection() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", null, KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--columns");
+ addlArgsArray.add("ID,MSG");
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+
+ }
+ public void testStaticPartitioning() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.STATIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testDynamicPartitioning() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.DYNAMIC_KEY),
+ };
+
+ List<String> addlArgsArray = new ArrayList<String>();
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testStaicAndDynamicPartitioning() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.STATIC_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "2", "2", KeyType.DYNAMIC_KEY),
+ };
+
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ /**
+ * Test other file formats.
+ */
+ public void testSequenceFile() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+ "1", KeyType.STATIC_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+ "2", KeyType.DYNAMIC_KEY), };
+
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+ utils.setStorageInfo(HCatalogTestUtils.STORED_AS_SEQFILE);
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+
+ public void testTextFile() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+ "1", KeyType.STATIC_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+ "2", KeyType.DYNAMIC_KEY), };
+
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+ utils.setStorageInfo(HCatalogTestUtils.STORED_AS_TEXT);
+ runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
new file mode 100644
index 0000000..293015e
--- /dev/null
+++ b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
@@ -0,0 +1,712 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
+import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
+import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.junit.Before;
+
+import com.cloudera.sqoop.Sqoop;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.tool.SqoopTool;
+
+/**
+ * Test that we can export HCatalog tables into databases.
+ */
+public class HCatalogImportTest extends ImportJobTestCase {
+ private static final Log LOG =
+ LogFactory.getLog(HCatalogImportTest.class);
+ private final HCatalogTestUtils utils = HCatalogTestUtils.instance();
+ private List<String> extraTestArgs = null;
+ private List<String> configParams = null;
+
+ @Override
+ @Before
+ public void setUp() {
+ super.setUp();
+ try {
+ utils.initUtils();
+ extraTestArgs = new ArrayList<String>();
+ configParams = new ArrayList<String>();
+ } catch (Exception e) {
+ throw new RuntimeException("Error initializing HCatTestUtilis", e);
+ }
+ }
+
+ /**
+ * @return an argv for the CodeGenTool to use when creating tables to export.
+ */
+ protected String[] getCodeGenArgv(String... extraArgs) {
+ List<String> codeGenArgv = new ArrayList<String>();
+
+ if (null != extraArgs) {
+ for (String arg : extraArgs) {
+ codeGenArgv.add(arg);
+ }
+ }
+
+ codeGenArgv.add("--table");
+ codeGenArgv.add(getTableName());
+ codeGenArgv.add("--connect");
+ codeGenArgv.add(getConnectString());
+ codeGenArgv.add("--hcatalog-table");
+ codeGenArgv.add(getTableName());
+
+ return codeGenArgv.toArray(new String[0]);
+ }
+
+ private void setExtraArgs(List<String> args) {
+ extraTestArgs.clear();
+ if (args != null && args.size() > 0) {
+ extraTestArgs.addAll(args);
+ }
+ }
+
+ private List<String> getConfigParams() {
+ return configParams;
+ }
+
+ private void setConfigParams(List<String> params) {
+ configParams.clear();
+ if (params != null && params.size() > 0) {
+ configParams.addAll(params);
+ }
+ }
+ @Override
+ protected List<String> getExtraArgs(Configuration conf) {
+ List<String> addlArgsArray = new ArrayList<String>();
+ if (extraTestArgs != null && extraTestArgs.size() > 0) {
+ addlArgsArray.addAll(extraTestArgs);
+ }
+ Map<String, String> addlArgsMap = utils.getAddlTestArgs();
+ String[] argv = {};
+
+ if (addlArgsMap.containsKey("-libjars")) {
+ argv = new String[2];
+ argv[0] = "-libjars";
+ argv[1] = addlArgsMap.get("-libjars");
+ }
+ addlArgsArray.add("-m");
+ addlArgsArray.add("1");
+ addlArgsArray.add("--hcatalog-table");
+ addlArgsArray.add(getTableName());
+ for (String k : addlArgsMap.keySet()) {
+ if (!k.equals("-libjars")) {
+ addlArgsArray.add(k);
+ addlArgsArray.add(addlArgsMap.get(k));
+ }
+ }
+ return addlArgsArray;
+ }
+
+ @Override
+ protected String[] getArgv(boolean includeHadoopFlags, String[] colNames,
+ Configuration conf) {
+ if (null == colNames) {
+ colNames = getColNames();
+ }
+ String columnsString = "";
+ String splitByCol = null;
+ if (colNames != null) {
+ splitByCol = colNames[0];
+ for (String col : colNames) {
+ columnsString += col + ",";
+ }
+ }
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ CommonArgs.addHadoopFlags(args);
+ }
+ args.addAll(getConfigParams());
+ args.add("--table");
+ args.add(getTableName());
+ if (colNames != null) {
+ args.add("--columns");
+ args.add(columnsString);
+ args.add("--split-by");
+ args.add(splitByCol);
+ }
+ args.add("--hcatalog-table");
+ args.add(getTableName());
+ args.add("--connect");
+ args.add(getConnectString());
+ args.addAll(getExtraArgs(conf));
+
+ return args.toArray(new String[0]);
+ }
+
+ private void validateHCatRecords(final List<HCatRecord> recs,
+ final HCatSchema schema, int expectedCount,
+ ColumnGenerator... cols) throws IOException {
+ if (recs.size() != expectedCount) {
+ fail("Expected records = " + expectedCount
+ + ", actual = " + recs.size());
+ return;
+ }
+ schema.getFieldNames();
+ Collections.sort(recs, new Comparator<HCatRecord>()
+ {
+ @Override
+ public int compare(HCatRecord hr1, HCatRecord hr2) {
+ try {
+ return hr1.getInteger("id", schema)
+ - hr2.getInteger("id", schema);
+ } catch (Exception e) {
+ LOG.warn("Exception caught while sorting hcat records " + e);
+ }
+ return 0;
+ }
+ });
+
+ Object expectedVal = null;
+ Object actualVal = null;
+ for (int i = 0; i < recs.size(); ++i) {
+ HCatRecord rec = recs.get(i);
+ expectedVal = i;
+ actualVal = rec.get("id", schema);
+ LOG.info("Validating field: id (expected = "
+ + expectedVal + ", actual = " + actualVal + ")");
+ HCatalogTestUtils.assertEquals(expectedVal, actualVal);
+ expectedVal = "textfield" + i;
+ actualVal = rec.get("msg", schema);
+ LOG.info("Validating field: msg (expected = "
+ + expectedVal + ", actual = " + actualVal + ")");
+ HCatalogTestUtils.assertEquals(rec.get("msg", schema), "textfield" + i);
+ for (ColumnGenerator col : cols) {
+ String name = col.getName().toLowerCase();
+ expectedVal = col.getHCatValue(i);
+ actualVal = rec.get(name, schema);
+ LOG.info("Validating field: " + name + " (expected = "
+ + expectedVal + ", actual = " + actualVal + ")");
+ HCatalogTestUtils.assertEquals(expectedVal, actualVal);
+ }
+ }
+ }
+
+ protected void runImport(SqoopTool tool, String[] argv) throws IOException {
+ // run the tool through the normal entry-point.
+ int ret;
+ try {
+ Configuration conf = getConf();
+ SqoopOptions opts = getSqoopOptions(conf);
+ Sqoop sqoop = new Sqoop(tool, conf, opts);
+ ret = Sqoop.runSqoop(sqoop, argv);
+ } catch (Exception e) {
+ LOG.error("Got exception running import: " + e.toString());
+ e.printStackTrace();
+ ret = 1;
+ }
+ if (0 != ret) {
+ throw new IOException("Import failure; return status " + ret);
+ }
+ }
+
+ private void runHCatImport(List<String> addlArgsArray,
+ int totalRecords, String table, ColumnGenerator[] cols,
+ String[] cNames) throws Exception {
+ runHCatImport(addlArgsArray, totalRecords, table, cols, cNames, false);
+ }
+
+ private void runHCatImport(List<String> addlArgsArray,
+ int totalRecords, String table, ColumnGenerator[] cols,
+ String[] cNames, boolean dontCreate) throws Exception {
+ CreateMode mode = CreateMode.CREATE;
+ if (dontCreate) {
+ mode = CreateMode.NO_CREATION;
+ }
+ HCatSchema tblSchema =
+ utils.createHCatTable(mode, totalRecords, table, cols);
+ utils.createSqlTable(getConnection(), false, totalRecords, table, cols);
+ Map<String, String> addlArgsMap = utils.getAddlTestArgs();
+ String[] argv = {};
+ addlArgsArray.add("-m");
+ addlArgsArray.add("1");
+ addlArgsArray.add("--hcatalog-table");
+ addlArgsArray.add(table);
+ if (addlArgsMap.containsKey("-libjars")) {
+ argv = new String[2];
+ argv[0] = "-libjars";
+ argv[1] = addlArgsMap.get("-libjars");
+ }
+ for (String k : addlArgsMap.keySet()) {
+ if (!k.equals("-libjars")) {
+ addlArgsArray.add(k);
+ addlArgsArray.add(addlArgsMap.get(k));
+ }
+ }
+ String[] colNames = null;
+ if (cNames != null) {
+ colNames = cNames;
+ } else {
+ colNames = new String[2 + cols.length];
+ colNames[0] = "ID";
+ colNames[1] = "MSG";
+ for (int i = 0; i < cols.length; ++i) {
+ colNames[2 + i] = cols[i].getName().toUpperCase();
+ }
+ }
+ String[] importArgs = getArgv(true, colNames, new Configuration());
+ LOG.debug("Import args = " + Arrays.toString(importArgs));
+ SqoopHCatUtilities.instance().setConfigured(false);
+ runImport(new ImportTool(), importArgs);
+ List<HCatRecord> recs = utils.readHCatRecords(null, table, null);
+ LOG.debug("HCat records ");
+ LOG.debug(utils.hCatRecordDump(recs, tblSchema));
+ validateHCatRecords(recs, tblSchema, 10, cols);
+ }
+
+ public void testIntTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "boolean", Types.BOOLEAN, HCatFieldSchema.Type.BOOLEAN,
+ Boolean.TRUE, Boolean.TRUE, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "tinyint", Types.INTEGER, HCatFieldSchema.Type.INT, 10,
+ 10, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "smallint", Types.INTEGER, HCatFieldSchema.Type.INT, 100,
+ 100, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(3),
+ "int", Types.INTEGER, HCatFieldSchema.Type.INT, 1000,
+ 1000, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(4),
+ "bigint", Types.BIGINT, HCatFieldSchema.Type.BIGINT, 10000L,
+ 10000L, KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testFloatTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "float", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 10.0F,
+ 10.F, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "real", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 20.0F,
+ 20.0F, KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "double", Types.DOUBLE, HCatFieldSchema.Type.DOUBLE, 30.0D,
+ 30.0D, KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testNumberTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "numeric(18,2)", Types.NUMERIC, HCatFieldSchema.Type.STRING, "1000",
+ new BigDecimal("1000"), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "decimal(18,2)", Types.DECIMAL, HCatFieldSchema.Type.STRING, "2000",
+ new BigDecimal("2000"), KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testDateTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "date", Types.DATE, HCatFieldSchema.Type.STRING, "2013-12-31",
+ new Date(113, 11, 31), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "time", Types.TIME, HCatFieldSchema.Type.STRING, "10:11:12",
+ new Time(10, 11, 12), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.STRING,
+ "2013-12-31 10:11:12.0", new Timestamp(113, 11, 31, 10, 11, 12, 0),
+ KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testDateTypesToBigInt() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ long offset = TimeZone.getDefault().getRawOffset();
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "date", Types.DATE, HCatFieldSchema.Type.BIGINT, 0 - offset,
+ new Date(70, 0, 1), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "time", Types.TIME, HCatFieldSchema.Type.BIGINT, 36672000L - offset,
+ new Time(10, 11, 12), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+ "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.BIGINT,
+ 36672000L - offset, new Timestamp(70, 0, 1, 10, 11, 12, 0),
+ KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--map-column-hive");
+ addlArgsArray.add("COL0=bigint,COL1=bigint,COL2=bigint");
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testStringTypes() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "char(10)", Types.CHAR, HCatFieldSchema.Type.STRING, "string to test",
+ "string to test", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "longvarchar", Types.LONGVARCHAR, HCatFieldSchema.Type.STRING,
+ "string to test", "string to test", KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testBinaryTypes() throws Exception {
+ ByteBuffer bb = ByteBuffer.wrap(new byte[] { 0, 1, 2 });
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "binary(10)", Types.BINARY, HCatFieldSchema.Type.BINARY,
+ bb.array(), bb.array(), KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "longvarbinary", Types.BINARY, HCatFieldSchema.Type.BINARY,
+ bb.array(), bb.array(), KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testColumnProjection() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ null, null, KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ List<String> cfgParams = new ArrayList<String>();
+ cfgParams.add("-D");
+ cfgParams.add(SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP
+ + "=true");
+ setConfigParams(cfgParams);
+ String[] colNames = new String[] { "ID", "MSG" };
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, colNames);
+ }
+
+ public void testColumnProjectionMissingPartKeys() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ null, null, KeyType.DYNAMIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ List<String> cfgParams = new ArrayList<String>();
+ cfgParams.add("-D");
+ cfgParams.add(SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP
+ + "=true");
+ setConfigParams(cfgParams);
+ String[] colNames = new String[] { "ID", "MSG" };
+ try {
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, colNames);
+ fail("Column projection with missing dynamic partition keys must fail");
+ } catch (Throwable t) {
+ LOG.info("Job fails as expected : " + t);
+ StringWriter sw = new StringWriter();
+ t.printStackTrace(new PrintWriter(sw));
+ LOG.info("Exception stack trace = " + sw);
+ }
+ }
+ public void testStaticPartitioning() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.STATIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testDynamicPartitioning() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.DYNAMIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testStaicAndDynamicPartitioning() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.STATIC_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "2", "2", KeyType.DYNAMIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ /**
+ * Test other file formats.
+ */
+ public void testSequenceFile() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+ "1", KeyType.STATIC_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+ "2", KeyType.DYNAMIC_KEY), };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+ setExtraArgs(addlArgsArray);
+ utils.setStorageInfo(HCatalogTestUtils.STORED_AS_SEQFILE);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testTextFile() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.STATIC_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "2", "2", KeyType.DYNAMIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col0");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("1");
+ setExtraArgs(addlArgsArray);
+ utils.setStorageInfo(HCatalogTestUtils.STORED_AS_TEXT);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testTableCreation() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.STATIC_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "2", "2", KeyType.DYNAMIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--create-hcatalog-table");
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+ }
+
+ public void testTableCreationWithPartition() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "2", "2", KeyType.STATIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col1");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("2");
+ addlArgsArray.add("--create-hcatalog-table");
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+ }
+
+ public void testTableCreationWithStorageStanza() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "1", "1", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "2", "2", KeyType.STATIC_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-partition-key");
+ addlArgsArray.add("col1");
+ addlArgsArray.add("--hive-partition-value");
+ addlArgsArray.add("2");
+ addlArgsArray.add("--create-hcatalog-table");
+ addlArgsArray.add("--hcatalog-storage-stanza");
+ addlArgsArray.add(HCatalogTestUtils.STORED_AS_TEXT);
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+ }
+
+ public void testHiveDropDelims() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "Test", "\u0001\n\rTest", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "Test2", "\u0001\r\nTest2", KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-drop-import-delims");
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testHiveDelimsReplacement() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "^^^Test", "\u0001\n\rTest", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+ "^^^Test2", "\u0001\r\nTest2", KeyType.NOT_A_KEY),
+ };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--hive-delims-replacement");
+ addlArgsArray.add("^");
+ setExtraArgs(addlArgsArray);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testDynamicKeyInMiddle() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+ "1", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+ "2", KeyType.DYNAMIC_KEY), };
+ List<String> addlArgsArray = new ArrayList<String>();
+ setExtraArgs(addlArgsArray);
+ utils.setStorageInfo(HCatalogTestUtils.STORED_AS_SEQFILE);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+ }
+
+ public void testCreateTableWithPreExistingTable() throws Exception {
+ final int TOTAL_RECORDS = 1 * 10;
+ String table = getTableName().toUpperCase();
+ ColumnGenerator[] cols = new ColumnGenerator[] {
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+ "1", KeyType.NOT_A_KEY),
+ HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+ "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+ "2", KeyType.DYNAMIC_KEY), };
+ List<String> addlArgsArray = new ArrayList<String>();
+ addlArgsArray.add("--create-hcatalog-table");
+ setExtraArgs(addlArgsArray);
+ try {
+ // Precreate table
+ utils.createHCatTable(CreateMode.CREATE, TOTAL_RECORDS, table, cols);
+ runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+ fail("HCatalog job with --create-hcatalog-table and pre-existing"
+ + " table should fail");
+ } catch (Exception e) {
+ LOG.debug("Caught expected exception while running "
+ + " create-hcatalog-table with pre-existing table test", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java b/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java
new file mode 100644
index 0000000..ddae5f5
--- /dev/null
+++ b/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java
@@ -0,0 +1,855 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.junit.Assert;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+
+/**
+ * HCatalog common test utilities.
+ *
+ */
+public final class HCatalogTestUtils {
+ protected Configuration conf;
+ private static List<HCatRecord> recsToLoad = new ArrayList<HCatRecord>();
+ private static List<HCatRecord> recsRead = new ArrayList<HCatRecord>();
+ private static final Log LOG = LogFactory.getLog(HCatalogTestUtils.class);
+ private FileSystem fs;
+ private final SqoopHCatUtilities utils = SqoopHCatUtilities.instance();
+ private static final double DELTAVAL = 1e-10;
+ public static final String SQOOP_HCATALOG_TEST_ARGS =
+ "sqoop.hcatalog.test.args";
+ private final boolean initialized = false;
+ private static String storageInfo = null;
+ public static final String STORED_AS_RCFILE = "stored as\n\trcfile\n";
+ public static final String STORED_AS_SEQFILE = "stored as\n\tsequencefile\n";
+ public static final String STORED_AS_TEXT = "stored as\n\ttextfile\n";
+
+ private HCatalogTestUtils() {
+ }
+
+ private static final class Holder {
+ @SuppressWarnings("synthetic-access")
+ private static final HCatalogTestUtils INSTANCE = new HCatalogTestUtils();
+
+ private Holder() {
+ }
+ }
+
+ @SuppressWarnings("synthetic-access")
+ public static HCatalogTestUtils instance() {
+ return Holder.INSTANCE;
+ }
+
+ public void initUtils() throws IOException, MetaException {
+ if (initialized) {
+ return;
+ }
+ conf = new Configuration();
+ if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+ conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+ }
+ fs = FileSystem.get(conf);
+ fs.initialize(fs.getWorkingDirectory().toUri(), conf);
+ storageInfo = null;
+ SqoopHCatUtilities.setTestMode(true);
+ }
+
+ public static String getStorageInfo() {
+ if (null != storageInfo && storageInfo.length() > 0) {
+ return storageInfo;
+ } else {
+ return STORED_AS_RCFILE;
+ }
+ }
+
+ public void setStorageInfo(String info) {
+ storageInfo = info;
+ }
+
+ private static String getDropTableCmd(final String dbName,
+ final String tableName) {
+ return "DROP TABLE IF EXISTS " + dbName.toLowerCase() + "."
+ + tableName.toLowerCase();
+ }
+
+ private static String getHCatCreateTableCmd(String dbName,
+ String tableName, List<HCatFieldSchema> tableCols,
+ List<HCatFieldSchema> partKeys) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("create table ").append(dbName.toLowerCase()).append('.');
+ sb.append(tableName.toLowerCase()).append(" (\n\t");
+ for (int i = 0; i < tableCols.size(); ++i) {
+ HCatFieldSchema hfs = tableCols.get(i);
+ if (i > 0) {
+ sb.append(",\n\t");
+ }
+ sb.append(hfs.getName().toLowerCase());
+ sb.append(' ').append(hfs.getTypeString());
+ }
+ sb.append(")\n");
+ if (partKeys != null && partKeys.size() > 0) {
+ sb.append("partitioned by (\n\t");
+ for (int i = 0; i < partKeys.size(); ++i) {
+ HCatFieldSchema hfs = partKeys.get(i);
+ if (i > 0) {
+ sb.append("\n\t,");
+ }
+ sb.append(hfs.getName().toLowerCase());
+ sb.append(' ').append(hfs.getTypeString());
+ }
+ sb.append(")\n");
+ }
+ sb.append(getStorageInfo());
+ LOG.info("Create table command : " + sb);
+ return sb.toString();
+ }
+
+ /**
+ * The record writer mapper for HCatalog tables that writes records from an in
+ * memory list.
+ */
+ public void createHCatTableUsingSchema(String dbName,
+ String tableName, List<HCatFieldSchema> tableCols,
+ List<HCatFieldSchema> partKeys)
+ throws Exception {
+
+ String databaseName = dbName == null
+ ? SqoopHCatUtilities.DEFHCATDB : dbName;
+ LOG.info("Dropping HCatalog table if it exists " + databaseName
+ + '.' + tableName);
+ String dropCmd = getDropTableCmd(databaseName, tableName);
+
+ try {
+ utils.launchHCatCli(dropCmd);
+ } catch (Exception e) {
+ LOG.debug("Drop hcatalog table exception : " + e);
+ LOG.info("Unable to drop table." + dbName + "."
+ + tableName + ". Assuming it did not exist");
+ }
+ LOG.info("Creating HCatalog table if it exists " + databaseName
+ + '.' + tableName);
+ String createCmd = getHCatCreateTableCmd(databaseName, tableName,
+ tableCols, partKeys);
+ utils.launchHCatCli(createCmd);
+ LOG.info("Created HCatalog table " + dbName + "." + tableName);
+ }
+
+ /**
+ * The record writer mapper for HCatalog tables that writes records from an in
+ * memory list.
+ */
+ public static class HCatWriterMapper extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ private static int writtenRecordCount = 0;
+
+ public static int getWrittenRecordCount() {
+ return writtenRecordCount;
+ }
+
+ public static void setWrittenRecordCount(int count) {
+ HCatWriterMapper.writtenRecordCount = count;
+ }
+
+ @Override
+ public void map(LongWritable key, Text value,
+ Context context)
+ throws IOException, InterruptedException {
+ try {
+ HCatRecord rec = recsToLoad.get(writtenRecordCount);
+ context.write(null, rec);
+ writtenRecordCount++;
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ e.printStackTrace(System.err);
+ }
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * The record reader mapper for HCatalog tables that reads records into an in
+ * memory list.
+ */
+ public static class HCatReaderMapper extends
+ Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
+
+ private static int readRecordCount = 0; // test will be in local mode
+
+ public static int getReadRecordCount() {
+ return readRecordCount;
+ }
+
+ public static void setReadRecordCount(int count) {
+ HCatReaderMapper.readRecordCount = count;
+ }
+
+ @Override
+ public void map(WritableComparable key, HCatRecord value,
+ Context context) throws IOException, InterruptedException {
+ try {
+ recsRead.add(value);
+ readRecordCount++;
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ e.printStackTrace(System.err);
+ }
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private void createInputFile(Path path, int rowCount)
+ throws IOException {
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ FSDataOutputStream os = fs.create(path);
+ for (int i = 0; i < rowCount; i++) {
+ String s = i + "\n";
+ os.writeChars(s);
+ }
+ os.close();
+ }
+
+ public List<HCatRecord> loadHCatTable(String dbName,
+ String tableName, Map<String, String> partKeyMap,
+ HCatSchema tblSchema, List<HCatRecord> records)
+ throws Exception {
+
+ Job job = new Job(conf, "HCat load job");
+
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatWriterMapper.class);
+
+
+ // Just writ 10 lines to the file to drive the mapper
+ Path path = new Path(fs.getWorkingDirectory(),
+ "mapreduce/HCatTableIndexInput");
+
+ job.getConfiguration()
+ .setInt(ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
+ int writeCount = records.size();
+ recsToLoad.clear();
+ recsToLoad.addAll(records);
+ createInputFile(path, writeCount);
+ // input/output settings
+ HCatWriterMapper.setWrittenRecordCount(0);
+
+ FileInputFormat.setInputPaths(job, path);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
+ partKeyMap);
+
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+ HCatOutputFormat.setSchema(job, tblSchema);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+ job.setNumReduceTasks(0);
+ SqoopHCatUtilities.addJars(job, new SqoopOptions());
+ boolean success = job.waitForCompletion(true);
+
+ if (!success) {
+ throw new IOException("Loading HCatalog table with test records failed");
+ }
+ utils.invokeOutputCommitterForLocalMode(job);
+ LOG.info("Loaded " + HCatWriterMapper.writtenRecordCount + " records");
+ return recsToLoad;
+ }
+
+ /**
+ * Run a local map reduce job to read records from HCatalog table.
+ * @param readCount
+ * @param filter
+ * @return
+ * @throws Exception
+ */
+ public List<HCatRecord> readHCatRecords(String dbName,
+ String tableName, String filter) throws Exception {
+
+ HCatReaderMapper.setReadRecordCount(0);
+ recsRead.clear();
+
+ // Configuration conf = new Configuration();
+ Job job = new Job(conf, "HCatalog reader job");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatReaderMapper.class);
+ job.getConfiguration()
+ .setInt(ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ HCatInputFormat.setInput(job, dbName, tableName).setFilter(filter);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setNumReduceTasks(0);
+
+ Path path = new Path(fs.getWorkingDirectory(),
+ "mapreduce/HCatTableIndexOutput");
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+
+ FileOutputFormat.setOutputPath(job, path);
+
+ job.waitForCompletion(true);
+ LOG.info("Read " + HCatReaderMapper.readRecordCount + " records");
+
+ return recsRead;
+ }
+
+ /**
+ * An enumeration type to hold the partition key type of the ColumnGenerator
+ * defined columns.
+ */
+ public enum KeyType {
+ NOT_A_KEY,
+ STATIC_KEY,
+ DYNAMIC_KEY
+ };
+
+ /**
+ * An enumeration type to hold the creation mode of the HCatalog table.
+ */
+ public enum CreateMode {
+ NO_CREATION,
+ CREATE,
+ CREATE_AND_LOAD,
+ };
+
+ /**
+ * When generating data for export tests, each column is generated according
+ * to a ColumnGenerator.
+ */
+ public interface ColumnGenerator {
+ /*
+ * The column name
+ */
+ String getName();
+
+ /**
+ * For a row with id rowNum, what should we write into that HCatalog column
+ * to export?
+ */
+ Object getHCatValue(int rowNum);
+
+ /**
+ * For a row with id rowNum, what should the database return for the given
+ * column's value?
+ */
+ Object getDBValue(int rowNum);
+
+ /** Return the column type to put in the CREATE TABLE statement. */
+ String getDBTypeString();
+
+ /** Return the SqlType for this column. */
+ int getSqlType();
+
+ /** Return the HCat type for this column. */
+ HCatFieldSchema.Type getHCatType();
+
+
+ /**
+ * If the field is a partition key, then whether is part of the static
+ * partitioning specification in imports or exports. Only one key can be a
+ * static partitioning key. After the first column marked as static, rest of
+ * the keys will be considered dynamic even if they are marked static.
+ */
+ KeyType getKeyType();
+ }
+
+ /**
+ * Return the column name for a column index. Each table contains two columns
+ * named 'id' and 'msg', and then an arbitrary number of additional columns
+ * defined by ColumnGenerators. These columns are referenced by idx 0, 1, 2
+ * and on.
+ * @param idx
+ * the index of the ColumnGenerator in the array passed to
+ * createTable().
+ * @return the name of the column
+ */
+ public static String forIdx(int idx) {
+ return "col" + idx;
+ }
+
+ public static ColumnGenerator colGenerator(final String name,
+ final String dbType, final int sqlType,
+ final HCatFieldSchema.Type hCatType, final Object hCatValue,
+ final Object dbValue, final KeyType keyType) {
+ return new ColumnGenerator() {
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Object getDBValue(int rowNum) {
+ return dbValue;
+ }
+
+ @Override
+ public Object getHCatValue(int rowNum) {
+ return hCatValue;
+ }
+
+ @Override
+ public String getDBTypeString() {
+ return dbType;
+ }
+
+ @Override
+ public int getSqlType() {
+ return sqlType;
+ }
+
+ @Override
+ public HCatFieldSchema.Type getHCatType() {
+ return hCatType;
+ }
+
+ public KeyType getKeyType() {
+ return keyType;
+ }
+
+ };
+ }
+
+ public static void assertEquals(Object expectedVal,
+ Object actualVal) {
+
+ if (expectedVal != null && expectedVal instanceof byte[]) {
+ Assert
+ .assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
+ } else {
+ if (expectedVal instanceof Float) {
+ if (actualVal instanceof Double) {
+ Assert.assertEquals(((Float) expectedVal).floatValue(),
+ ((Double) actualVal).doubleValue(), DELTAVAL);
+ } else {
+ Assert
+ .assertEquals("Got unexpected column value", expectedVal,
+ actualVal);
+ }
+ } else if (expectedVal instanceof Double) {
+ if (actualVal instanceof Float) {
+ Assert.assertEquals(((Double) expectedVal).doubleValue(),
+ ((Float) actualVal).doubleValue(), DELTAVAL);
+ } else {
+ Assert
+ .assertEquals("Got unexpected column value", expectedVal,
+ actualVal);
+ }
+ } else {
+ Assert
+ .assertEquals("Got unexpected column value", expectedVal,
+ actualVal);
+ }
+ }
+ }
+
+ /**
+ * Verify that on a given row, a column has a given value.
+ *
+ * @param id
+ * the id column specifying the row to test.
+ */
+ public void assertSqlColValForRowId(Connection conn,
+ String table, int id, String colName,
+ Object expectedVal) throws SQLException {
+ LOG.info("Verifying column " + colName + " has value " + expectedVal);
+
+ PreparedStatement statement = conn.prepareStatement(
+ "SELECT " + colName + " FROM " + table + " WHERE id = " + id,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ Object actualVal = null;
+ try {
+ ResultSet rs = statement.executeQuery();
+ try {
+ rs.next();
+ actualVal = rs.getObject(1);
+ } finally {
+ rs.close();
+ }
+ } finally {
+ statement.close();
+ }
+
+ assertEquals(expectedVal, actualVal);
+ }
+
+ /**
+ * Verify that on a given row, a column has a given value.
+ *
+ * @param id
+ * the id column specifying the row to test.
+ */
+ public static void assertHCatColValForRowId(List<HCatRecord> recs,
+ HCatSchema schema, int id, String fieldName,
+ Object expectedVal) throws IOException {
+ LOG.info("Verifying field " + fieldName + " has value " + expectedVal);
+
+ Object actualVal = null;
+ for (HCatRecord rec : recs) {
+ if (rec.getInteger("id", schema).equals(id)) {
+ actualVal = rec.get(fieldName, schema);
+ break;
+ }
+ }
+ if (actualVal == null) {
+ throw new IOException("No record found with id = " + id);
+ }
+ if (expectedVal != null && expectedVal instanceof byte[]) {
+ Assert
+ .assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
+ } else {
+ if (expectedVal instanceof Float) {
+ if (actualVal instanceof Double) {
+ Assert.assertEquals(((Float) expectedVal).floatValue(),
+ ((Double) actualVal).doubleValue(), DELTAVAL);
+ } else {
+ Assert
+ .assertEquals("Got unexpected column value", expectedVal,
+ actualVal);
+ }
+ } else if (expectedVal instanceof Double) {
+ if (actualVal instanceof Float) {
+ Assert.assertEquals(((Double) expectedVal).doubleValue(),
+ ((Float) actualVal).doubleValue(), DELTAVAL);
+ } else {
+ Assert
+ .assertEquals("Got unexpected column value", expectedVal,
+ actualVal);
+ }
+ } else {
+ Assert
+ .assertEquals("Got unexpected column value", expectedVal,
+ actualVal);
+ }
+ }
+ }
+
+ /**
+ * Return a SQL statement that drops a table, if it exists.
+ *
+ * @param tableName
+ * the table to drop.
+ * @return the SQL statement to drop that table.
+ */
+ public static String getSqlDropTableStatement(String tableName) {
+ return "DROP TABLE " + tableName + " IF EXISTS";
+ }
+
+ public static String getSqlCreateTableStatement(String tableName,
+ ColumnGenerator... extraCols) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE ");
+ sb.append(tableName);
+ sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+ int colNum = 0;
+ for (ColumnGenerator gen : extraCols) {
+ sb.append(", " + forIdx(colNum++) + " " + gen.getDBTypeString());
+ }
+ sb.append(")");
+ String cmd = sb.toString();
+ LOG.debug("Generated SQL create table command : " + cmd);
+ return cmd;
+ }
+
+ public static String getSqlInsertTableStatement(String tableName,
+ ColumnGenerator... extraCols) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("INSERT INTO ");
+ sb.append(tableName);
+ sb.append(" (id, msg");
+ int colNum = 0;
+ for (ColumnGenerator gen : extraCols) {
+ sb.append(", " + forIdx(colNum++));
+ }
+ sb.append(") VALUES ( ?, ?");
+ for (int i = 0; i < extraCols.length; ++i) {
+ sb.append(",?");
+ }
+ sb.append(")");
+ String s = sb.toString();
+ LOG.debug("Generated SQL insert table command : " + s);
+ return s;
+ }
+
+ public void createSqlTable(Connection conn, boolean generateOnly,
+ int count, String table, ColumnGenerator... extraCols)
+ throws Exception {
+ PreparedStatement statement = conn.prepareStatement(
+ getSqlDropTableStatement(table),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ statement = conn.prepareStatement(
+ getSqlCreateTableStatement(table, extraCols),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ if (!generateOnly) {
+ loadSqlTable(conn, table, count, extraCols);
+ }
+ }
+
+ public HCatSchema createHCatTable(CreateMode mode, int count,
+ String table, ColumnGenerator... extraCols)
+ throws Exception {
+ HCatSchema hCatTblSchema = generateHCatTableSchema(extraCols);
+ HCatSchema hCatPartSchema = generateHCatPartitionSchema(extraCols);
+ HCatSchema hCatFullSchema = new HCatSchema(hCatTblSchema.getFields());
+ for (HCatFieldSchema hfs : hCatPartSchema.getFields()) {
+ hCatFullSchema.append(hfs);
+ }
+ if (mode != CreateMode.NO_CREATION) {
+
+ createHCatTableUsingSchema(null, table,
+ hCatTblSchema.getFields(), hCatPartSchema.getFields());
+ if (mode == CreateMode.CREATE_AND_LOAD) {
+ HCatSchema hCatLoadSchema = new HCatSchema(hCatTblSchema.getFields());
+ HCatSchema dynPartSchema =
+ generateHCatDynamicPartitionSchema(extraCols);
+ for (HCatFieldSchema hfs : dynPartSchema.getFields()) {
+ hCatLoadSchema.append(hfs);
+ }
+ loadHCatTable(hCatLoadSchema, table, count, extraCols);
+ }
+ }
+ return hCatFullSchema;
+ }
+
+ private void loadHCatTable(HCatSchema hCatSchema, String table,
+ int count, ColumnGenerator... extraCols)
+ throws Exception {
+ Map<String, String> staticKeyMap = new HashMap<String, String>();
+ for (ColumnGenerator col : extraCols) {
+ if (col.getKeyType() == KeyType.STATIC_KEY) {
+ staticKeyMap.put(col.getName(), (String) col.getHCatValue(0));
+ }
+ }
+ loadHCatTable(null, table, staticKeyMap,
+ hCatSchema, generateHCatRecords(count, hCatSchema, extraCols));
+ }
+
+ private void loadSqlTable(Connection conn, String table, int count,
+ ColumnGenerator... extraCols) throws Exception {
+ PreparedStatement statement = conn.prepareStatement(
+ getSqlInsertTableStatement(table, extraCols),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ for (int i = 0; i < count; ++i) {
+ statement.setObject(1, i, Types.INTEGER);
+ statement.setObject(2, "textfield" + i, Types.VARCHAR);
+ for (int j = 0; j < extraCols.length; ++j) {
+ statement.setObject(j + 3, extraCols[j].getDBValue(i),
+ extraCols[j].getSqlType());
+ }
+ statement.executeUpdate();
+ }
+ if (!conn.getAutoCommit()) {
+ conn.commit();
+ }
+ } finally {
+ statement.close();
+ }
+ }
+
+ private HCatSchema generateHCatTableSchema(ColumnGenerator... extraCols)
+ throws Exception {
+ List<HCatFieldSchema> hCatTblCols = new ArrayList<HCatFieldSchema>();
+ hCatTblCols.clear();
+ hCatTblCols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, ""));
+ hCatTblCols
+ .add(new HCatFieldSchema("msg", HCatFieldSchema.Type.STRING, ""));
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getKeyType() == KeyType.NOT_A_KEY) {
+ hCatTblCols
+ .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+ }
+ }
+ HCatSchema hCatTblSchema = new HCatSchema(hCatTblCols);
+ return hCatTblSchema;
+ }
+
+ private HCatSchema generateHCatPartitionSchema(ColumnGenerator... extraCols)
+ throws Exception {
+ List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
+
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getKeyType() != KeyType.NOT_A_KEY) {
+ hCatPartCols
+ .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+ }
+ }
+ HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
+ return hCatPartSchema;
+ }
+
+ private HCatSchema generateHCatDynamicPartitionSchema(
+ ColumnGenerator... extraCols) throws Exception {
+ List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
+ hCatPartCols.clear();
+ boolean staticFound = false;
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getKeyType() != KeyType.NOT_A_KEY) {
+ if (gen.getKeyType() == KeyType.STATIC_KEY && !staticFound) {
+ staticFound = true;
+ continue;
+ }
+ hCatPartCols
+ .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+ }
+ }
+ HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
+ return hCatPartSchema;
+
+ }
+
+ private HCatSchema generateHCatStaticPartitionSchema(
+ ColumnGenerator... extraCols) throws Exception {
+ List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
+ hCatPartCols.clear();
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getKeyType() == KeyType.STATIC_KEY) {
+ hCatPartCols
+ .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+ break;
+ }
+ }
+ HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
+ return hCatPartSchema;
+ }
+
+ private List<HCatRecord> generateHCatRecords(int numRecords,
+ HCatSchema hCatTblSchema, ColumnGenerator... extraCols) throws Exception {
+ List<HCatRecord> records = new ArrayList<HCatRecord>();
+ List<HCatFieldSchema> hCatTblCols = hCatTblSchema.getFields();
+ int size = hCatTblCols.size();
+ for (int i = 0; i < numRecords; ++i) {
+ DefaultHCatRecord record = new DefaultHCatRecord(size);
+ record.set(hCatTblCols.get(0).getName(), hCatTblSchema, i);
+ record.set(hCatTblCols.get(1).getName(), hCatTblSchema, "textfield" + i);
+ boolean staticFound = false;
+ int idx = 0;
+ for (int j = 0; j < extraCols.length; ++j) {
+ if (extraCols[j].getKeyType() == KeyType.STATIC_KEY
+ && !staticFound) {
+ staticFound = true;
+ continue;
+ }
+ record.set(hCatTblCols.get(idx + 2).getName(), hCatTblSchema,
+ extraCols[j].getHCatValue(i));
+ ++idx;
+ }
+
+ records.add(record);
+ }
+ return records;
+ }
+
+ public String hCatRecordDump(List<HCatRecord> recs,
+ HCatSchema schema) throws Exception {
+ List<String> fields = schema.getFieldNames();
+ int count = 0;
+ StringBuilder sb = new StringBuilder(1024);
+ for (HCatRecord rec : recs) {
+ sb.append("HCat Record : " + ++count).append('\n');
+ for (String field : fields) {
+ sb.append('\t').append(field).append('=');
+ sb.append(rec.get(field, schema)).append('\n');
+ sb.append("\n\n");
+ }
+ }
+ return sb.toString();
+ }
+
+ public Map<String, String> getAddlTestArgs() {
+ String addlArgs = System.getProperty(SQOOP_HCATALOG_TEST_ARGS);
+ Map<String, String> addlArgsMap = new HashMap<String, String>();
+ if (addlArgs != null) {
+ String[] argsArray = addlArgs.split(",");
+ for (String s : argsArray) {
+ String[] keyVal = s.split("=");
+ if (keyVal.length == 2) {
+ addlArgsMap.put(keyVal[0], keyVal[1]);
+ } else {
+ LOG.info("Ignoring malformed addl arg " + s);
+ }
+ }
+ }
+ return addlArgsMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/org/apache/sqoop/hcat/TestHCatalogBasic.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hcat/TestHCatalogBasic.java b/src/test/org/apache/sqoop/hcat/TestHCatalogBasic.java
new file mode 100644
index 0000000..da803d0
--- /dev/null
+++ b/src/test/org/apache/sqoop/hcat/TestHCatalogBasic.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import junit.framework.TestCase;
+
+import org.junit.Before;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.tool.ExportTool;
+import com.cloudera.sqoop.tool.ImportTool;
+
+/**
+ * Test basic HCatalog related features.
+ */
+public class TestHCatalogBasic extends TestCase {
+ private static ImportTool importTool;
+ private static ExportTool exportTool;
+
+ @Before
+ @Override
+ public void setUp() {
+ importTool = new ImportTool();
+ exportTool = new ExportTool();
+ }
+ private SqoopOptions parseImportArgs(String[] argv) throws Exception {
+ SqoopOptions opts = importTool.parseArguments(argv, null, null, false);
+ return opts;
+ }
+
+ private SqoopOptions parseExportArgs(String[] argv) throws Exception {
+ SqoopOptions opts = exportTool.parseArguments(argv, null, null, false);
+ return opts;
+ }
+
+ public void testHCatalogHomeWithImport() throws Exception {
+ String[] args = {
+ "--hcatalog-home",
+ "/usr/lib/hcatalog",
+ };
+
+ SqoopOptions opts = parseImportArgs(args);
+ }
+
+ public void testHCatalogHomeWithExport() throws Exception {
+ String[] args = {
+ "--hcatalog-home",
+ "/usr/lib/hcatalog",
+ };
+
+ SqoopOptions opts = parseExportArgs(args);
+ }
+
+ public void testHCatalogImport() throws Exception {
+ String[] args = {
+ "--hcatalog-table",
+ "table",
+ };
+
+ SqoopOptions opts = parseImportArgs(args);
+ }
+
+ public void testHCatalogExport() throws Exception {
+ String[] args = {
+ "--hcatalog-table",
+ "table",
+ };
+
+ SqoopOptions opts = parseExportArgs(args);
+ }
+
+ public void testHCatImportWithTargetDir() throws Exception {
+ String[] args = {
+ "--connect",
+ "jdbc:db:url",
+ "--table",
+ "dbtable",
+ "--hcatalog-table",
+ "table",
+ "--target-dir",
+ "/target/dir",
+ };
+ try {
+ SqoopOptions opts = parseImportArgs(args);
+ importTool.validateOptions(opts);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testHCatImportWithWarehouseDir() throws Exception {
+ String[] args = {
+ "--connect",
+ "jdbc:db:url",
+ "--table",
+ "dbtable",
+ "--hcatalog-table",
+ "table",
+ "--warehouse-dir",
+ "/target/dir",
+ };
+ try {
+ SqoopOptions opts = parseImportArgs(args);
+ importTool.validateOptions(opts);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testHCatImportWithHiveImport() throws Exception {
+ String[] args = {
+ "--connect",
+ "jdbc:db:url",
+ "--table",
+ "dbtable",
+ "--hcatalog-table",
+ "table",
+ "--hive-import",
+ };
+ try {
+ SqoopOptions opts = parseImportArgs(args);
+ importTool.validateOptions(opts);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testHCatExportWithExportDir() throws Exception {
+ String[] args = {
+ "--connect",
+ "jdbc:db:url",
+ "--table",
+ "dbtable",
+ "--hcatalog-table",
+ "table",
+ "--export-dir",
+ "/export/dir",
+ };
+ try {
+ SqoopOptions opts = parseExportArgs(args);
+ exportTool.validateOptions(opts);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testHCatImportWithDirect() throws Exception {
+ String[] args = {
+ "--connect",
+ "jdbc:db:url",
+ "--table",
+ "dbtable",
+ "--hcatalog-table",
+ "table",
+ "--direct",
+ };
+ try {
+ SqoopOptions opts = parseImportArgs(args);
+ importTool.validateOptions(opts);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testHCatImportWithSequenceFile() throws Exception {
+ String[] args = {
+ "--connect",
+ "jdbc:db:url",
+ "--table",
+ "dbtable",
+ "--hcatalog-table",
+ "table",
+ "--as-sequencefile"
+ };
+ try {
+ SqoopOptions opts = parseImportArgs(args);
+ importTool.validateOptions(opts);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testHCatImportWithAvroFile() throws Exception {
+ String[] args = {
+ "--connect",
+ "jdbc:db:url",
+ "--table",
+ "dbtable",
+ "--hcatalog-table",
+ "table",
+ "--as-avrofile"
+ };
+ try {
+ SqoopOptions opts = parseImportArgs(args);
+ importTool.validateOptions(opts);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+ public void testHCatImportWithCreateTable() throws Exception {
+ String[] args = {
+ "--hcatalog-table",
+ "table",
+ "--create-hcatalog-table",
+ };
+ SqoopOptions opts = parseImportArgs(args);
+ }
+
+ public void testHCatImportWithStorageStanza() throws Exception {
+ String[] args = {
+ "--hcatalog-table",
+ "table",
+ "--hcatalog-storage-stanza",
+ "stored as textfile",
+ };
+ SqoopOptions opts = parseImportArgs(args);
+ }
+
+ public void testHCatImportWithDatabase() throws Exception {
+ String[] args = {
+ "--hcatalog-table",
+ "table",
+ "--hcatalog-database",
+ "default",
+ };
+ SqoopOptions opts = parseImportArgs(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/testdata/hcatalog/conf/hive-log4j.properties
----------------------------------------------------------------------
diff --git a/testdata/hcatalog/conf/hive-log4j.properties b/testdata/hcatalog/conf/hive-log4j.properties
new file mode 100644
index 0000000..7fa0546
--- /dev/null
+++ b/testdata/hcatalog/conf/hive-log4j.properties
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+
+sqoop.root.logger=DEBUG,console,DRFA
+hive.root.logger=DEBUG,console,DRFA
+hcatalog.root.logger=DEBUG,console,DRFA
+sqoop.log.dir=${user.dir}/sqoop/logs
+hive.log.dir=${user.dir}/sqoop/logs/
+sqoop.log.file=sqoop.log
+hive.log.file=hive.log
+org.apache.sqoop=DEBUG, console
+org.apache.hadoop=DEBUG, console
+org.apache.hive=DEBUG, console
+org.apache.hcatalog=DEBUG, console
+
+# Define the root logger to the system property "sqoop.root.logger".
+log4j.rootLogger=${sqoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=WARN
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#custom logging levels
+#log4j.logger.xxx=DEBUG
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
+
+
+log4j.category.DataNucleus=INFO,DRFA
+log4j.category.Datastore=INFO,DRFA
+log4j.category.Datastore.Schema=INFO,DRFA
+log4j.category.JPOX.Datastore=INFO,DRFA
+log4j.category.JPOX.Plugin=INFO,DRFA
+log4j.category.JPOX.MetaData=INFO,DRFA
+log4j.category.JPOX.Query=INFO,DRFA
+log4j.category.JPOX.General=INFO,DRFA
+log4j.category.JPOX.Enhancer=INFO,DRFA
+log4j.logger.org.apache.hadoop.conf.Configuration=INFO,DRFA
+
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/testdata/hcatalog/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/testdata/hcatalog/conf/hive-site.xml b/testdata/hcatalog/conf/hive-site.xml
new file mode 100644
index 0000000..c84af28
--- /dev/null
+++ b/testdata/hcatalog/conf/hive-site.xml
@@ -0,0 +1,26 @@
+<configuration>
+ <property>
+ <name>hive.metastore.local</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>${test.build.data}/sqoop/warehouse</value>
+ </property>
+ <property>
+ <name>hive.metastore.uris</name>
+ <value></value>
+ </property>
+ <property>
+ <name>javax.jdo.option.ConnectionURL</name>
+ <value>jdbc:derby:;databaseName=${test.build.data}/sqoop/metastore_db;create=true</value>
+ </property>
+ <property>
+ <name>javax.jdo.option.ConnectionDriverName</name>
+ <value>org.apache.derby.jdbc.EmbeddedDriver</value>
+ </property>
+ <property>
+ <name>hive.querylog.location</name>
+ <value>${test.build.data}/sqoop/logs</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/testdata/hcatalog/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/testdata/hcatalog/conf/log4j.properties b/testdata/hcatalog/conf/log4j.properties
new file mode 100644
index 0000000..370fbfa
--- /dev/null
+++ b/testdata/hcatalog/conf/log4j.properties
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.sqoop=DEBUG, console
+org.apache.hadoop=DEBUG, console
+org.apache.hive=DEBUG, console
+org.apache.hcatalog=DEBUG, console
+
+
+sqoop.root.logger=DEBUG,console,DRFA
+hive.root.logger=DEBUG,console,DRFA
+hcatalog.root.logger=DEBUG,console,DRFA
+sqoop.log.dir=${user.dir}/sqoop/logs
+sqoop.log.file=sqoop.log
+
+
+
+# Define the root logger to the system property "sqoop.root.logger".
+log4j.rootLogger=${sqoop.root.logger}
+
+#
+# DRFA
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${sqoop.log.dir}/${sqoop.log.file}
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.DRFA.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
[3/3] git commit: SQOOP-931: Integrate HCatalog with Sqoop
Posted by ja...@apache.org.
SQOOP-931: Integrate HCatalog with Sqoop
(Venkat Ranganathan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5e88d43b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5e88d43b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5e88d43b
Branch: refs/heads/trunk
Commit: 5e88d43b5af024c1b9fd82029f7de4c325dcf009
Parents: b07906a
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Jun 7 07:33:21 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Jun 7 07:33:21 2013 -0700
----------------------------------------------------------------------
bin/configure-sqoop | 30 +
bin/configure-sqoop.cmd | 9 +
build.xml | 21 +-
ivy.xml | 18 +-
ivy/ivysettings.xml | 15 +-
src/docs/user/SqoopUserGuide.txt | 2 +
src/docs/user/hcatalog.txt | 313 ++++
src/java/org/apache/sqoop/SqoopOptions.java | 107 ++-
.../sqoop/config/ConfigurationConstants.java | 17 +
src/java/org/apache/sqoop/hive/HiveImport.java | 17 +
src/java/org/apache/sqoop/manager/ConnManager.java | 64 +
.../sqoop/mapreduce/DataDrivenImportJob.java | 16 +
.../org/apache/sqoop/mapreduce/ExportJobBase.java | 20 +-
.../org/apache/sqoop/mapreduce/ImportJobBase.java | 13 +
.../org/apache/sqoop/mapreduce/JdbcExportJob.java | 13 +-
src/java/org/apache/sqoop/mapreduce/JobBase.java | 4 +-
.../mapreduce/hcat/SqoopHCatExportFormat.java | 138 ++
.../mapreduce/hcat/SqoopHCatExportMapper.java | 349 +++++
.../mapreduce/hcat/SqoopHCatImportMapper.java | 343 ++++
.../sqoop/mapreduce/hcat/SqoopHCatInputSplit.java | 109 ++
.../mapreduce/hcat/SqoopHCatRecordReader.java | 153 ++
.../sqoop/mapreduce/hcat/SqoopHCatUtilities.java | 1215 +++++++++++++++
src/java/org/apache/sqoop/tool/BaseSqoopTool.java | 231 +++-
src/java/org/apache/sqoop/tool/CodeGenTool.java | 3 +
src/java/org/apache/sqoop/tool/ExportTool.java | 9 +-
src/java/org/apache/sqoop/tool/ImportTool.java | 14 +-
src/perftest/ExportStressTest.java | 2 +-
src/test/com/cloudera/sqoop/ThirdPartyTests.java | 7 +
.../com/cloudera/sqoop/hive/TestHiveImport.java | 10 +
.../cloudera/sqoop/testutil/BaseSqoopTestCase.java | 2 +-
.../cloudera/sqoop/testutil/ExportJobTestCase.java | 14 +-
.../org/apache/sqoop/hcat/HCatalogExportTest.java | 377 +++++
.../org/apache/sqoop/hcat/HCatalogImportTest.java | 712 +++++++++
.../org/apache/sqoop/hcat/HCatalogTestUtils.java | 855 ++++++++++
.../org/apache/sqoop/hcat/TestHCatalogBasic.java | 251 +++
testdata/hcatalog/conf/hive-log4j.properties | 87 +
testdata/hcatalog/conf/hive-site.xml | 26 +
testdata/hcatalog/conf/log4j.properties | 55 +
38 files changed, 5596 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/bin/configure-sqoop
----------------------------------------------------------------------
diff --git a/bin/configure-sqoop b/bin/configure-sqoop
index 61ff3f2..178720d 100755
--- a/bin/configure-sqoop
+++ b/bin/configure-sqoop
@@ -54,9 +54,22 @@ if [ -z "${HADOOP_MAPRED_HOME}" ]; then
HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
fi
fi
+
+# We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+# so that hcat script works correctly on BigTop
+if [ -z "${HADOOP_HOME}" ]; then
+ if [ -n "${HADOOP_COMMON_HOME}" ]; then
+ HADOOP_HOME=${HADOOP_COMMON_HOME}
+ export HADOOP_HOME
+ fi
+fi
+
if [ -z "${HBASE_HOME}" ]; then
HBASE_HOME=/usr/lib/hbase
fi
+if [ -z "${HCAT_HOME}" ]; then
+ HCAT_HOME=/usr/lib/hcatalog
+fi
# Check: If we can't find our dependencies, give up here.
if [ ! -d "${HADOOP_COMMON_HOME}" ]; then
@@ -76,6 +89,12 @@ if [ ! -d "${HBASE_HOME}" ]; then
echo 'Please set $HBASE_HOME to the root of your HBase installation.'
fi
+## Moved to be a runtime check in sqoop.
+if [ ! -d "${HCAT_HOME}" ]; then
+ echo "Warning: $HCAT_HOME does not exist! HCatalog jobs will fail."
+ echo 'Please set $HCAT_HOME to the root of your HCatalog installation.'
+fi
+
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
@@ -106,6 +125,15 @@ if [ -e "$HBASE_HOME/bin/hbase" ]; then
SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
fi
+# Add HCatalog to dependency list
+if [ -e "${HCAT_HOME}/bin/hcat" ]; then
+ TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`${HCAT_HOME}/bin/hcat -classpath`
+ if [ -z "${HIVE_CONF_DIR}" ]; then
+ TMP_SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}:${HIVE_CONF_DIR}
+ fi
+ SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
+fi
+
ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
if [ -d "${ZOOCFGDIR}" ]; then
SQOOP_CLASSPATH=$ZOOCFGDIR:$SQOOP_CLASSPATH
@@ -136,4 +164,6 @@ export HADOOP_CLASSPATH
export HADOOP_COMMON_HOME
export HADOOP_MAPRED_HOME
export HBASE_HOME
+export HCAT_HOME
+export HIVE_CONF_DIR
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/bin/configure-sqoop.cmd
----------------------------------------------------------------------
diff --git a/bin/configure-sqoop.cmd b/bin/configure-sqoop.cmd
index f5fd608..ec57e37 100644
--- a/bin/configure-sqoop.cmd
+++ b/bin/configure-sqoop.cmd
@@ -55,6 +55,15 @@ if not defined HADOOP_MAPRED_HOME (
exit /b 1
)
)
+
+:: We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+:: so that hcat script works correctly on BigTop
+if not defined HADOOP_HOME (
+ if defined HADOOP_COMMON_HOME (
+ set HADOOP_HOME=%HADOOP_COMMON_HOME%
+ )
+)
+
:: Check for HBase dependency
if not defined HBASE_HOME (
if defined HBASE_VERSION (
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 636c103..b4b08e5 100644
--- a/build.xml
+++ b/build.xml
@@ -51,6 +51,7 @@
<property name="hbase.version" value="0.90.3-cdh3u1" />
<property name="zookeeper.version" value="3.3.3-cdh3u1" />
<property name="hadoop.version.full" value="0.20" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
<elseif>
@@ -60,6 +61,7 @@
<property name="hbase.version" value="0.92.0" />
<property name="zookeeper.version" value="3.4.2" />
<property name="hadoop.version.full" value="0.23" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
</elseif>
@@ -70,6 +72,7 @@
<property name="hbase.version" value="0.92.0" />
<property name="zookeeper.version" value="3.4.2" />
<property name="hadoop.version.full" value="1.0.0" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
</elseif>
@@ -80,6 +83,7 @@
<property name="hbase.version" value="0.94.2" />
<property name="zookeeper.version" value="3.4.2" />
<property name="hadoop.version.full" value="2.0.4-alpha" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
</elseif>
@@ -600,6 +604,7 @@
<tarfileset dir="${build.dir}" mode="755">
<include name="${bin.artifact.name}/bin/*" />
<include name="${bin.artifact.name}/testdata/hive/bin/*" />
+ <include name="${bin.artifact.name}/testdata/hcatalog/conf/*" />
<include name="${bin.artifact.name}/**/*.sh" />
</tarfileset>
</tar>
@@ -643,12 +648,14 @@
<tarfileset dir="${build.dir}" mode="664">
<exclude name="${src.artifact.name}/bin/*" />
<exclude name="${src.artifact.name}/testdata/hive/bin/*" />
+ <exclude name="${src.artifact.name}/testdata/hcatalog/conf/*" />
<exclude name="${src.artifact.name}/**/*.sh" />
<include name="${src.artifact.name}/**" />
</tarfileset>
<tarfileset dir="${build.dir}" mode="755">
<include name="${src.artifact.name}/bin/*" />
<include name="${src.artifact.name}/testdata/hive/bin/*" />
+ <include name="${src.artifact.name}/testdata/hcatalog/conf/*" />
<include name="${src.artifact.name}/**/*.sh" />
</tarfileset>
</tar>
@@ -658,6 +665,9 @@
<target name="test-prep" depends="test-prep-normal,test-prep-thirdparty,
test-prep-manual"/>
+ <path id="hcatalog.conf.dir">
+ <pathelement location="${basedir}/testdata/hcatalog/conf"/>
+ </path>
<target name="test-eval-condition">
<condition property="thirdparty_or_manual">
<or>
@@ -667,6 +677,8 @@
</condition>
</target>
+
+
<target name="test-prep-normal" unless="thirdparty_or_manual"
depends="test-eval-condition">
<!-- Set this to run all the "standard" tests -->
@@ -712,7 +724,7 @@
<delete dir="${test.log.dir}"/>
<mkdir dir="${test.log.dir}"/>
<delete dir="${build.test}/data"/>
- <mkdir dir="${build.test}/data" />
+ <mkdir dir="${build.test}/data/sqoop" />
<mkdir dir="${cobertura.class.dir}" />
<junit
printsummary="yes" showoutput="${test.output}"
@@ -803,10 +815,17 @@
<sysproperty key="java.security.krb5.kdc"
value="${java.security.krb5.kdc}"/>
+ <!-- Location of Hive logs -->
+ <!--<sysproperty key="hive.log.dir"
+ value="${test.build.data}/sqoop/logs"/> -->
+
<classpath>
<!-- instrumented classes go ahead of normal classes -->
<pathelement location="${cobertura.class.dir}" />
+ <!-- Location of hive-site xml and other hadoop config files -->
+ <path refid="hcatalog.conf.dir" />
+
<!-- main classpath here. -->
<path refid="test.classpath" />
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 1fa4dd1..750adfc 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -37,10 +37,15 @@ under the License.
extends="runtime"
description="artifacts needed to compile/test the application"/>
<conf name="hbase" visibility="private" />
- <conf name="hadoop23" visibility="private" extends="common,runtime,hbase" />
- <conf name="hadoop20" visibility="private" extends="common,runtime,hbase" />
- <conf name="hadoop100" visibility="private" extends="common,runtime,hbase" />
- <conf name="hadoop200" visibility="private" extends="common,runtime,hbase" />
+ <conf name="hcatalog" visibility="private" />
+ <conf name="hadoop23" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
+ <conf name="hadoop20" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
+ <conf name="hadoop100" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
+ <conf name="hadoop200" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
<conf name="test" visibility="private" extends="common,runtime"/>
<conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@@ -172,6 +177,11 @@ under the License.
<exclude org="com.cloudera.cdh" module="zookeeper-ant" />
</dependency>
+ <dependency org="org.apache.hcatalog" name="hcatalog-core"
+ rev="${hcatalog.version}" conf="hcatalog->default">
+ <artifact name="hcatalog-core" type="jar"/>
+ </dependency>
+
<exclude org="org.apache.hadoop" module="avro"/>
<exclude org="commons-daemon" module="commons-daemon" />
<exclude type="pom" />
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/ivy/ivysettings.xml
----------------------------------------------------------------------
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index c4cc561..2920c89 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -42,6 +42,9 @@ under the License.
<property name="releases.cloudera.com"
value="https://repository.cloudera.com/content/repositories/releases/"
override="false"/>
+ <property name="www.datanucleus.org"
+ value="http://www.datanucleus.org/downloads/maven2/"
+ override="false"/>
<property name="maven2.pattern"
value="[organisation]/[module]/[revision]/[artifact]-[revision](-[classifier])"/>
<property name="repo.dir" value="${user.home}/.m2/repository"/>
@@ -52,6 +55,8 @@ under the License.
<resolvers>
<ibiblio name="maven2" root="${repo.maven.org}"
pattern="${maven2.pattern.ext}" m2compatible="true"/>
+ <ibiblio name="datanucleus" root="${www.datanucleus.org}"
+ pattern="${maven2.pattern.ext}" m2compatible="true"/>
<ibiblio name="cloudera-releases" root="${releases.cloudera.com}"
pattern="${maven2.pattern.ext}" m2compatible="true"/>
<ibiblio name="apache-snapshot" root="${snapshot.apache.org}"
@@ -67,16 +72,18 @@ under the License.
<chain name="default" dual="true" checkmodified="true"
changingPattern=".*SNAPSHOT">
<resolver ref="fs"/>
- <resolver ref="apache-snapshot"/>
+ <resolver ref="apache-snapshot"/>
+ <resolver ref="datanucleus"/>
<resolver ref="cloudera-releases"/>
- <resolver ref="cloudera-staging"/>
+ <resolver ref="cloudera-staging"/>
<resolver ref="maven2"/>
</chain>
<chain name="internal" dual="true">
<resolver ref="fs"/>
- <resolver ref="apache-snapshot"/>
- <resolver ref="cloudera-staging"/>
+ <resolver ref="apache-snapshot"/>
+ <resolver ref="datanucleus"/>
+ <resolver ref="cloudera-staging"/>
<resolver ref="maven2"/>
</chain>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/docs/user/SqoopUserGuide.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt
index 01ac1cf..2e88887 100644
--- a/src/docs/user/SqoopUserGuide.txt
+++ b/src/docs/user/SqoopUserGuide.txt
@@ -72,6 +72,8 @@ include::help.txt[]
include::version.txt[]
+include::hcatalog.txt[]
+
include::compatibility.txt[]
include::connectors.txt[]
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/docs/user/hcatalog.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hcatalog.txt b/src/docs/user/hcatalog.txt
new file mode 100644
index 0000000..b8e495e
--- /dev/null
+++ b/src/docs/user/hcatalog.txt
@@ -0,0 +1,313 @@
+
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+////
+
+Sqoop-HCatalog Integration
+--------------------------
+
+HCatalog Background
+~~~~~~~~~~~~~~~~~~~
+
+HCatalog is a table and storage management service for Hadoop that enables
+users with different data processing tools – Pig, MapReduce, and Hive –
+to more easily read and write data on the grid. HCatalog’s table abstraction
+presents users with a relational view of data in the Hadoop distributed
+file system (HDFS) and ensures that users need not worry about where or
+in what format their data is stored: RCFile format, text files, or
+SequenceFiles.
+
+HCatalog supports reading and writing files in any format for which a Hive
+SerDe (serializer-deserializer) has been written. By default, HCatalog
+supports RCFile, CSV, JSON, and SequenceFile formats. To use a custom
+format, you must provide the InputFormat and OutputFormat as well as the SerDe.
+
+The ability of HCatalog to abstract various storage formats is used in
+providing the RCFile (and future file types) support to Sqoop.
+
+Exposing HCatalog Tables to Sqoop
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog integration with Sqoop is patterned on an existing feature set that
+supports Avro and Hive tables. Five new command line options are introduced,
+and some command line options defined for Hive are reused.
+
+New Command Line Options
+^^^^^^^^^^^^^^^^^^^^^^^^
+
++--hcatalog-database+::
+Specifies the database name for the HCatalog table. If not specified,
+the default database name +default+ is used. Providing the
++--hcatalog-database+ option without +--hcatalog-table+ is an error.
+This is not a required option.
+
++--hcatalog-table+::
+The argument value for this option is the HCatalog tablename.
+The presence of the +--hcatalog-table+ option signifies that the import
+or export job is done using HCatalog tables, and it is a required option for
+HCatalog jobs.
+
++--hcatalog-home+::
+The home directory for the HCatalog installation. The directory is
+expected to have a +lib+ subdirectory and a +share/hcatalog+ subdirectory
+with necessary HCatalog libraries. If not specified, the system property
++hcatalog.home+ will be checked and failing that, a system environment
+variable +HCAT_HOME+ will be checked. If none of these are set, the
+default value will be used and currently the default is set to
++/usr/lib/hcatalog+.
+This is not a required option.
+
++--create-hcatalog-table+::
+
+This option specifies whether an HCatalog table should be created
+automatically when importing data. By default, HCatalog tables are assumed
+to exist. The table name will be the same as the database table name
+translated to lower case. Further described in +Automatic Table Creation+
+below.
+
++--hcatalog-storage-stanza+::
+
+This option specifies the storage stanza to be appended to the table.
+Further described in +Automatic Table Creation+ below.
+
+Supported Sqoop Hive Options
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following Sqoop options are also used along with the +--hcatalog-table+
+option to provide additional input to the HCatalog jobs. Some of the existing
+Hive import job options are reused with HCatalog jobs instead of creating
+HCatalog-specific options for the same purpose.
+
++--map-column-hive+::
+This option maps a database column to HCatalog with a specific HCatalog
+type.
+
++--hive-home+::
+The Hive home location.
+
++--hive-partition-key+::
+Used for static partitioning filter. The partitioning key should be of
+type STRING. There can be only one static partitioning key.
+
++--hive-partition-value+::
+The value associated with the partition.
+
+Unsupported Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Unsupported Sqoop Hive Import Options
++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop Hive import options are not supported with HCatalog jobs.
+
+* +--hive-import+
+* +--hive-overwrite+
+
+Unsupported Sqoop Export and Import Options
++++++++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop export and import options are not supported with HCatalog jobs.
+
+* +--direct+
+* +--export-dir+
+* +--target-dir+
+* +--warehouse-dir+
+* +--append+
+* +--as-sequencefile+
+* +--as-avrofile+
+
+Ignored Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^
+
+The following options are ignored with HCatalog jobs.
+
+* All input delimiter options are ignored.
+
+* Output delimiters are generally ignored unless either
++--hive-drop-import-delims+ or +--hive-delims-replacement+ is used. When the
++--hive-drop-import-delims+ or +--hive-delims-replacement+ option is
+specified, all +CHAR+ type database table columns will be post-processed
+to either remove or replace the delimiters, respectively. See +Delimited Text
+Formats and Field and Line Delimiter Characters+ below. This is only needed
+if the HCatalog table uses text formats.
+
+Automatic Table Creation
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+One of the key features of Sqoop is to manage and create the table metadata
+when importing into Hadoop. HCatalog import jobs also provide for this
+feature with the option +--create-hcatalog-table+. Furthermore, one of the
+important benefits of the HCatalog integration is to provide storage
+agnosticism to Sqoop data movement jobs. To provide for that feature,
+HCatalog import jobs provide an option that lets a user specifiy the
+storage format for the created table.
+
+The option +--create-hcatalog-table+ is used as an indicator that a table
+has to be created as part of the HCatalog import job. If the option
++--create-hcatalog-table+ is specified and the table exists, then the
+table creation will fail and the job will be aborted.
+
+The option +--hcatalog-storage-stanza+ can be used to specify the storage
+format of the newly created table. The default value for this option is
++stored as rcfile+. The value specified for this option is assumed to be a
+valid Hive storage format expression. It will be appended to the +create table+
+command generated by the HCatalog import job as part of automatic table
+creation. Any error in the storage stanza will cause the table creation to
+fail and the import job will be aborted.
+
+Any additional resources needed to support the storage format referenced in
+the option +--hcatalog-storage-stanza+ should be provided to the job either
+by placing them in +$HIVE_HOME/lib+ or by providing them in +HADOOP_CLASSPATH+
+and +LIBJAR+ files.
+
+If the option +--hive-partition-key+ is specified, then the value of this
+option is used as the partitioning key for the newly created table. Only
+one partitioning key can be specified with this option.
+
+Object names are mapped to the lowercase equivalents as specified below
+when mapped to an HCatalog table. This includes the table name (which
+is the same as the external store table name converted to lower case)
+and field names.
+
+Delimited Text Formats and Field and Line Delimiter Characters
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog supports delimited text format as one of the table storage formats.
+But when delimited text is used and the imported data has fields that contain
+those delimiters, then the data may be parsed into a different number of
+fields and records by Hive, thereby losing data fidelity.
+
+For this case, one of these existing Sqoop import options can be used:
+
+* +--hive-delims-replacement+
+
+* +--hive-drop-import-delims+
+
+If either of these options is provided for import, then any column of type
+STRING will be formatted with the Hive delimiter processing and then written
+to the HCatalog table.
+
+HCatalog Table Requirements
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The HCatalog table should be created before using it as part of a Sqoop job
+if the default table creation options (with optional storage stanza) are not
+sufficient. All storage formats supported by HCatalog can be used with the
+creation of the HCatalog tables. This makes this feature readily adopt new
+storage formats that come into the Hive project, such as ORC files.
+
+Support for Partitioning
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+The Sqoop HCatalog feature supports the following table types:
+
+* Unpartitioned tables
+
+* Partitioned tables with a static partitioning key specified
+
+* Partitioned tables with dynamic partition keys from the database
+result set
+
+* Partitioned tables with a combination of a static key and additional
+dynamic partitioning keys
+
+Schema Mapping
+~~~~~~~~~~~~~~
+
+Sqoop currently does not support column name mapping. However, the user
+is allowed to override the type mapping. Type mapping loosely follows
+the Hive type mapping already present in Sqoop except that SQL types
+“FLOAT” and “REAL” are mapped to HCatalog type “float”. In the Sqoop type
+mapping for Hive, these two are mapped to “double”. Type mapping is primarily
+used for checking the column definition correctness only and can be overridden
+with the --map-column-hive option.
+
+All types except binary are assignable to a String type.
+
+Any field of number type (int, shortint, tinyint, bigint and bigdecimal,
+float and double) is assignable to another field of any number type during
+exports and imports. Depending on the precision and scale of the target type
+of assignment, truncations can occur.
+
+Furthermore, date/time/timestamps are mapped to string (the full
+date/time/timestamp representation) or bigint (the number of milliseconds
+since epoch) during imports and exports.
+
+BLOBs and CLOBs are only supported for imports. The BLOB/CLOB objects when
+imported are stored in a Sqoop-specific format and knowledge of this format
+is needed for processing these objects in a Pig/Hive job or another Map Reduce
+job.
+
+Database column names are mapped to their lowercase equivalents when mapped
+to the HCatalog fields. Currently, case-sensitive database object names are
+not supported.
+
+Projection of a set of columns from a table to an HCatalog table or loading
+to a column projection is allowed, subject to table constraints. The dynamic
+partitioning columns, if any, must be part of the projection when importing
+data into HCatalog tables.
+
+Dynamic partitioning fields should be mapped to database columns that are
+defined with the NOT NULL attribute (although this is not validated). A null
+value during import for a dynamic partitioning column will abort the Sqoop
+job.
+
+Support for HCatalog Data Types
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+All the primitive HCatalog types are supported. Currently all the complex
+HCatalog types are unsupported.
+
+BLOB/CLOB database types are only supported for imports.
+
+Providing Hive and HCatalog Libraries for the Sqoop Job
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+With the support for HCatalog added to Sqoop, any HCatalog job depends on a
+set of jar files being available both on the Sqoop client host and where the
+Map/Reduce tasks run. To run HCatalog jobs, the environment variable
++HADOOP_CLASSPATH+ must be set up as shown below before launching the Sqoop
+HCatalog jobs.
+
++HADOOP_CLASSPATH=$(hcat -classpath)+
++export HADOOP_CLASSPATH+
+
+
+The necessary HCatalog dependencies will be copied to the distributed cache
+automatically by the Sqoop job.
+
+Examples
+~~~~~~~~
+
+Create an HCatalog table, such as:
+
++hcat -e "create table txn(txn_date string, cust_id string, amount float,
+store_id int) partitioned by (cust_id string) stored as rcfile;"+
+
+
+Then Sqoop import and export of the "txn" HCatalog table can be invoked as
+follows:
+
+Import
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop import --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+
+
+Export
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop export --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index f18d43e..4be6a6a 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -59,6 +59,10 @@ public class SqoopOptions implements Cloneable {
public static final String METASTORE_PASSWORD_KEY =
"sqoop.metastore.client.record.password";
+ // Default hive and hcat locations.
+ public static final String DEF_HIVE_HOME = "/usr/lib/hive";
+ public static final String DEF_HCAT_HOME = "/usr/lib/hcatalog";
+
public static final boolean METASTORE_PASSWORD_DEFAULT = false;
/**
@@ -151,6 +155,15 @@ public class SqoopOptions implements Cloneable {
private String hiveDelimsReplacement;
@StoredAsProperty("hive.partition.key") private String hivePartitionKey;
@StoredAsProperty("hive.partition.value") private String hivePartitionValue;
+ @StoredAsProperty("hcatalog.table.name")
+ private String hCatTableName;
+ @StoredAsProperty("hcatalog.database.name")
+ private String hCatDatabaseName;
+ @StoredAsProperty("hcatalog.create.table")
+ private boolean hCatCreateTable;
+ @StoredAsProperty("hcatalog.storage.stanza")
+ private String hCatStorageStanza;
+ private String hCatHome; // not serialized to metastore.
// User explicit mapping of types
private Properties mapColumnJava; // stored as map.colum.java
@@ -197,7 +210,9 @@ public class SqoopOptions implements Cloneable {
private DelimiterSet inputDelimiters; // codegen.input.delimiters.
private DelimiterSet outputDelimiters; // codegen.output.delimiters.
- private boolean areDelimsManuallySet;
+
+ private boolean areOutputDelimsManuallySet;
+ private boolean areInputDelimsManuallySet;
private Configuration conf;
@@ -580,7 +595,8 @@ public class SqoopOptions implements Cloneable {
// Delimiters were previously memoized; don't let the tool override
// them with defaults.
- this.areDelimsManuallySet = true;
+ this.areOutputDelimsManuallySet = true;
+ this.areInputDelimsManuallySet = true;
// If we loaded true verbose flag, we need to apply it
if (this.verbose) {
@@ -804,7 +820,21 @@ public class SqoopOptions implements Cloneable {
public static String getHiveHomeDefault() {
// Set this with $HIVE_HOME, but -Dhive.home can override.
String hiveHome = System.getenv("HIVE_HOME");
- return System.getProperty("hive.home", hiveHome);
+ hiveHome = System.getProperty("hive.home", hiveHome);
+ if (hiveHome == null) {
+ hiveHome = DEF_HIVE_HOME;
+ }
+ return hiveHome;
+ }
+
+ public static String getHCatHomeDefault() {
+ // Set this with $HCAT_HOME, but -Dhcatalog.home can override.
+ String hcatHome = System.getenv("HCAT_HOME");
+ hcatHome = System.getProperty("hcat.home", hcatHome);
+ if (hcatHome == null) {
+ hcatHome = DEF_HCAT_HOME;
+ }
+ return hcatHome;
}
private void initDefaults(Configuration baseConfiguration) {
@@ -813,6 +843,7 @@ public class SqoopOptions implements Cloneable {
this.hadoopMapRedHome = System.getenv("HADOOP_MAPRED_HOME");
this.hiveHome = getHiveHomeDefault();
+ this.hCatHome = getHCatHomeDefault();
this.inputDelimiters = new DelimiterSet(
DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR,
@@ -834,7 +865,8 @@ public class SqoopOptions implements Cloneable {
this.jarDirIsAuto = true;
this.layout = FileLayout.TextFile;
- this.areDelimsManuallySet = false;
+ this.areOutputDelimsManuallySet = false;
+ this.areInputDelimsManuallySet = false;
this.numMappers = DEFAULT_NUM_MAPPERS;
this.useCompression = false;
@@ -1263,6 +1295,47 @@ public class SqoopOptions implements Cloneable {
this.failIfHiveTableExists = fail;
}
+ // HCatalog support
+ public void setHCatTableName(String ht) {
+ this.hCatTableName = ht;
+ }
+
+ public String getHCatTableName() {
+ return this.hCatTableName;
+ }
+
+ public void setHCatDatabaseName(String hd) {
+ this.hCatDatabaseName = hd;
+ }
+
+ public String getHCatDatabaseName() {
+ return this.hCatDatabaseName;
+ }
+
+
+ public String getHCatHome() {
+ return hCatHome;
+ }
+
+ public void setHCatHome(String home) {
+ this.hCatHome = home;
+ }
+
+ public boolean doCreateHCatalogTable() {
+ return hCatCreateTable;
+ }
+
+ public void setCreateHCatalogTable(boolean create) {
+ this.hCatCreateTable = create;
+ }
+
+ public void setHCatStorageStanza(String stanza) {
+ this.hCatStorageStanza = stanza;
+ }
+
+ public String getHCatStorageStanza() {
+ return this.hCatStorageStanza;
+ }
/**
* @return location where .java files go; guaranteed to end with '/'.
*/
@@ -1673,18 +1746,32 @@ public class SqoopOptions implements Cloneable {
this.fetchSize = size;
}
+ /*
+ * @return true if the output delimiters have been explicitly set by the user
+ */
+ public boolean explicitOutputDelims() {
+ return areOutputDelimsManuallySet;
+ }
+
/**
- * @return true if the delimiters have been explicitly set by the user.
+ * Flag the output delimiter settings as explicit user settings, or implicit.
*/
- public boolean explicitDelims() {
- return areDelimsManuallySet;
+ public void setExplicitOutputDelims(boolean explicit) {
+ this.areOutputDelimsManuallySet = explicit;
}
/**
- * Flag the delimiter settings as explicit user settings, or implicit.
+ * @return true if the input delimiters have been explicitly set by the user.
*/
- public void setExplicitDelims(boolean explicit) {
- this.areDelimsManuallySet = explicit;
+ public boolean explicitInputDelims() {
+ return areInputDelimsManuallySet;
+ }
+
+ /**
+ * Flag the input delimiter settings as explicit user settings, or implicit.
+ */
+ public void setExplicitInputDelims(boolean explicit) {
+ this.areInputDelimsManuallySet = explicit;
}
public Configuration getConf() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index 5354063..2070b63 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -60,6 +60,18 @@ public final class ConfigurationConstants {
public static final String PROP_MAPRED_JOB_TRACKER_ADDRESS =
"mapred.job.tracker";
+ /**
+ * The Configuration property identifying the job tracker address (new).
+ */
+ public static final String PROP_MAPREDUCE_JOB_TRACKER_ADDRESS =
+ "mapreduce.jobtracker.address";
+
+ /**
+ * The Configuration property identifying the framework name. If set to YARN
+ * then we will not be in local mode.
+ */
+ public static final String PROP_MAPREDUCE_FRAMEWORK_NAME =
+ "mapreduce.framework.name";
/**
* The group name of task counters.
*/
@@ -78,6 +90,11 @@ public final class ConfigurationConstants {
public static final String COUNTER_MAP_INPUT_RECORDS =
"MAP_INPUT_RECORDS";
+ /**
+ * The name of the parameter for ToolRunner to set jars to add to distcache.
+ */
+ public static final String MAPRED_DISTCACHE_CONF_PARAM = "tmpjars";
+
private ConfigurationConstants() {
// Disable Explicit Object Creation
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/hive/HiveImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java
index 838f083..02596a6 100644
--- a/src/java/org/apache/sqoop/hive/HiveImport.java
+++ b/src/java/org/apache/sqoop/hive/HiveImport.java
@@ -60,6 +60,15 @@ public class HiveImport {
private ConnManager connManager;
private Configuration configuration;
private boolean generateOnly;
+ private static boolean testMode = false;
+
+ public static boolean getTestMode() {
+ return testMode;
+ }
+
+ public static void setTestMode(boolean mode) {
+ testMode = mode;
+ }
/** Entry point through which Hive invocation should be attempted. */
private static final String HIVE_MAIN_CLASS =
@@ -285,6 +294,14 @@ public class HiveImport {
throws IOException {
SubprocessSecurityManager subprocessSM = null;
+ if (testMode) {
+ // We use external mock hive process for test mode as
+ // HCatalog dependency would have brought in Hive classes.
+ LOG.debug("Using external Hive process in test mode.");
+ executeExternalHiveScript(filename, env);
+ return;
+ }
+
try {
Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index a1ac38e..3549bda 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -164,6 +164,70 @@ public abstract class ConnManager {
return HiveTypes.toHiveType(sqlType);
}
+ /**
+ * Resolve a database-specific type to HCat data type. Largely follows Sqoop's
+ * hive translation.
+ * @param sqlType
+ * sql type
+ * @return hcat type
+ */
+ public String toHCatType(int sqlType) {
+ switch (sqlType) {
+
+ // Ideally TINYINT and SMALLINT should be mapped to their
+ // HCat equivalents tinyint and smallint respectively
+ // But the Sqoop Java type conversion has them mapped to Integer
+ // Even though the referenced Java doc clearly recommends otherwise.
+ // Chaning this now can cause many of the sequence file usages to
+ // break as value class implementations will change. So, we
+ // just use the same behavior here.
+ case Types.SMALLINT:
+ case Types.TINYINT:
+ case Types.INTEGER:
+ return "int";
+
+ case Types.VARCHAR:
+ case Types.CHAR:
+ case Types.LONGVARCHAR:
+ case Types.NVARCHAR:
+ case Types.NCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ case Types.CLOB:
+ return "string";
+
+ case Types.FLOAT:
+ case Types.REAL:
+ return "float";
+
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ return "string";
+
+ case Types.DOUBLE:
+ return "double";
+
+ case Types.BIT:
+ case Types.BOOLEAN:
+ return "boolean";
+
+ case Types.BIGINT:
+ return "bigint";
+
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.BLOB:
+ case Types.LONGVARBINARY:
+ return "binary";
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot convert SQL type to HCatalog type " + sqlType);
+ }
+ }
+
/**
* Resolve a database-specific type to Avro data type.
* @param sqlType sql type
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index ef1d363..5afd90c 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -30,6 +31,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.lib.LargeObjectLoader;
@@ -63,6 +65,13 @@ public class DataDrivenImportJob extends ImportJobBase {
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
+ if (isHCatJob) {
+ LOG.info("Configuring mapper for HCatalog import job");
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(SqoopHCatUtilities.getImportValueClass());
+ job.setMapperClass(SqoopHCatUtilities.getImportMapperClass());
+ return;
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
// For text files, specify these as the output types; for
// other types, we just use the defaults.
@@ -82,6 +91,9 @@ public class DataDrivenImportJob extends ImportJobBase {
@Override
protected Class<? extends Mapper> getMapperClass() {
+ if (options.getHCatTableName() != null) {
+ return SqoopHCatUtilities.getImportMapperClass();
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return TextImportMapper.class;
} else if (options.getFileLayout()
@@ -98,6 +110,10 @@ public class DataDrivenImportJob extends ImportJobBase {
@Override
protected Class<? extends OutputFormat> getOutputFormatClass()
throws ClassNotFoundException {
+ if (isHCatJob) {
+ LOG.debug("Returning HCatOutputFormat for output format");
+ return SqoopHCatUtilities.getOutputFormatClass();
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return RawKeyTextOutputFormat.class;
} else if (options.getFileLayout()
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 1065d0b..d0be570 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.SqoopOptions;
@@ -57,7 +58,7 @@ public class ExportJobBase extends JobBase {
* The (inferred) type of a file or group of files.
*/
public enum FileType {
- SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+ SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, UNKNOWN
}
public static final Log LOG = LogFactory.getLog(
@@ -80,6 +81,7 @@ public class ExportJobBase extends JobBase {
protected ExportJobContext context;
+
public ExportJobBase(final ExportJobContext ctxt) {
this(ctxt, null, null, null);
}
@@ -195,6 +197,9 @@ public class ExportJobBase extends JobBase {
* @return the Path to the files we are going to export to the db.
*/
protected Path getInputPath() throws IOException {
+ if (isHCatJob) {
+ return null;
+ }
Path inputPath = new Path(context.getOptions().getExportDir());
Configuration conf = options.getConf();
inputPath = inputPath.makeQualified(FileSystem.get(conf));
@@ -207,7 +212,9 @@ public class ExportJobBase extends JobBase {
throws ClassNotFoundException, IOException {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- FileInputFormat.addInputPath(job, getInputPath());
+ if (!isHCatJob) {
+ FileInputFormat.addInputPath(job, getInputPath());
+ }
}
@Override
@@ -371,6 +378,12 @@ public class ExportJobBase extends JobBase {
}
propagateOptionsToJob(job);
+ if (isHCatJob) {
+ LOG.info("Configuring HCatalog for export job");
+ SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
+ hCatUtils.configureHCat(options, job, cmgr, tableName,
+ job.getConfiguration());
+ }
configureInputFormat(job, tableName, tableClassName, null);
configureOutputFormat(job, tableName, tableClassName);
configureMapper(job, tableName, tableClassName);
@@ -448,6 +461,9 @@ public class ExportJobBase extends JobBase {
}
protected FileType getInputFileType() {
+ if (isHCatJob) {
+ return FileType.HCATALOG_MANAGED_FILE;
+ }
try {
return getFileType(context.getOptions().getConf(), getInputPath());
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 2465f3f..ab7f21e 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
@@ -92,6 +93,13 @@ public class ImportJobBase extends JobBase {
job.setOutputFormatClass(getOutputFormatClass());
+ if (isHCatJob) {
+ LOG.debug("Configuring output format for HCatalog import job");
+ SqoopHCatUtilities.configureImportOutputFormat(options, job,
+ getContext().getConnManager(), tableName, job.getConfiguration());
+ return;
+ }
+
if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
job.getConfiguration().set("mapred.output.value.class", tableClassName);
}
@@ -149,6 +157,11 @@ public class ImportJobBase extends JobBase {
perfCounters.startClock();
boolean success = doSubmitJob(job);
+
+ if (isHCatJob) {
+ SqoopHCatUtilities.instance().invokeOutputCommitterForLocalMode(job);
+ }
+
perfCounters.stopClock();
Counters jobCounters = job.getCounters();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 20636a0..fee78e0 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.mapreduce.ExportJobBase;
@@ -65,7 +66,11 @@ public class JdbcExportJob extends ExportJobBase {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- if (fileType == FileType.AVRO_DATA_FILE) {
+ if (isHCatJob) {
+ SqoopHCatUtilities.configureExportInputFormat(options, job,
+ context.getConnManager(), tableName, job.getConfiguration());
+ return;
+ } else if (fileType == FileType.AVRO_DATA_FILE) {
LOG.debug("Configuring for Avro export");
ConnManager connManager = context.getConnManager();
Map<String, Integer> columnTypeInts;
@@ -93,6 +98,9 @@ public class JdbcExportJob extends ExportJobBase {
@Override
protected Class<? extends InputFormat> getInputFormatClass()
throws ClassNotFoundException {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getInputFormatClass();
+ }
if (fileType == FileType.AVRO_DATA_FILE) {
return AvroInputFormat.class;
}
@@ -101,6 +109,9 @@ public class JdbcExportJob extends ExportJobBase {
@Override
protected Class<? extends Mapper> getMapperClass() {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getExportMapperClass();
+ }
switch (fileType) {
case SEQUENCE_FILE:
return SequenceFileExportMapper.class;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/JobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 0df1156..322df1c 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -56,6 +56,7 @@ public class JobBase {
private Job mrJob;
private ClassLoader prevClassLoader = null;
+ protected final boolean isHCatJob;
public static final String PROPERTY_VERBOSE = "sqoop.verbose";
@@ -76,6 +77,7 @@ public class JobBase {
this.mapperClass = mapperClass;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
+ isHCatJob = options.getHCatTableName() != null;
}
/**
@@ -220,7 +222,7 @@ public class JobBase {
*/
protected void loadJars(Configuration conf, String ormJarFile,
String tableClassName) throws IOException {
-
+
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
if (isLocal) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
new file mode 100644
index 0000000..47febf7
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+
+/**
+ * A combined HCatInputFormat equivalent that allows us to generate the number
+ * of splits to the number of map tasks.
+ *
+ * The logic is simple. We get the list of splits for HCatInputFormat. If it is
+ * less than the number of mappers, all is good. Else, we sort the splits by
+ * size and assign them to each of the mappers in a simple scheme. After
+ * assigning the splits to each of the mapper, for the next round we start with
+ * the mapper that got the last split. That way, the size of the split is
+ * distributed in a more uniform fashion than a simple round-robin assignment.
+ */
+public class SqoopHCatExportFormat extends HCatInputFormat {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatExportFormat.class.getName());
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job)
+ throws IOException, InterruptedException {
+ List<InputSplit> hCatSplits = super.getSplits(job);
+ int hCatSplitCount = hCatSplits.size();
+ int expectedSplitCount = ExportInputFormat.getNumMapTasks(job);
+ if (expectedSplitCount == 0) {
+ expectedSplitCount = hCatSplitCount;
+ }
+ LOG.debug("Expected split count " + expectedSplitCount);
+ LOG.debug("HCatInputFormat provided split count " + hCatSplitCount);
+ // Sort the splits by length descending.
+
+ Collections.sort(hCatSplits, new Comparator<InputSplit>() {
+ @Override
+ public int compare(InputSplit is1, InputSplit is2) {
+ try {
+ return (int) (is2.getLength() - is1.getLength());
+ } catch (Exception e) {
+ LOG.warn("Exception caught while sorting Input splits " + e);
+ }
+ return 0;
+ }
+ });
+ List<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+
+ // The number of splits generated by HCatInputFormat is within
+ // our limits
+
+ if (hCatSplitCount <= expectedSplitCount) {
+ for (InputSplit split : hCatSplits) {
+ List<InputSplit> hcSplitList = new ArrayList<InputSplit>();
+ hcSplitList.add(split);
+ combinedSplits.add(new SqoopHCatInputSplit(hcSplitList));
+ }
+ return combinedSplits;
+ }
+ List<List<InputSplit>> combinedSplitList =
+ new ArrayList<List<InputSplit>>();
+ for (int i = 0; i < expectedSplitCount; i++) {
+ combinedSplitList.add(new ArrayList<InputSplit>());
+ }
+ boolean ascendingAssigment = true;
+
+ int lastSet = 0;
+ for (int i = 0; i < hCatSplitCount; ++i) {
+ int splitNum = i % expectedSplitCount;
+ int currentSet = i / expectedSplitCount;
+ if (currentSet != lastSet) {
+ ascendingAssigment = !ascendingAssigment;
+ }
+ if (ascendingAssigment) {
+ combinedSplitList.get(splitNum).add(hCatSplits.get(i));
+ } else {
+ combinedSplitList.
+ get(expectedSplitCount - 1 - splitNum).add(hCatSplits.get(i));
+ }
+ lastSet = currentSet;
+ }
+ for (int i = 0; i < expectedSplitCount; i++) {
+ SqoopHCatInputSplit sqoopSplit =
+ new SqoopHCatInputSplit(combinedSplitList.get(i));
+ combinedSplits.add(sqoopSplit);
+ }
+
+ return combinedSplits;
+
+ }
+
+ @Override
+ public RecordReader<WritableComparable, HCatRecord>
+ createRecordReader(InputSplit split,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ LOG.debug("Creating a SqoopHCatRecordReader");
+ return new SqoopHCatRecordReader(split, taskContext, this);
+ }
+
+ public RecordReader<WritableComparable, HCatRecord>
+ createHCatRecordReader(InputSplit split,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ LOG.debug("Creating a base HCatRecordReader");
+ return super.createRecordReader(split, taskContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
new file mode 100644
index 0000000..539cedf
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+
+/**
+ * A mapper that works on combined hcat splits.
+ */
+public class SqoopHCatExportMapper
+ extends
+ AutoProgressMapper<WritableComparable, HCatRecord,
+ SqoopRecord, WritableComparable> {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatExportMapper.class.getName());
+ private InputJobInfo jobInfo;
+ private HCatSchema hCatFullTableSchema;
+ private List<HCatFieldSchema> hCatSchemaFields;
+
+ private SqoopRecord sqoopRecord;
+ private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+ private static final String TIME_TYPE = "java.sql.Time";
+ private static final String DATE_TYPE = "java.sql.Date";
+ private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+ private static final String FLOAT_TYPE = "Float";
+ private static final String DOUBLE_TYPE = "Double";
+ private static final String BYTE_TYPE = "Byte";
+ private static final String SHORT_TYPE = "Short";
+ private static final String INTEGER_TYPE = "Integer";
+ private static final String LONG_TYPE = "Long";
+ private static final String BOOLEAN_TYPE = "Boolean";
+ private static final String STRING_TYPE = "String";
+ private static final String BYTESWRITABLE =
+ "org.apache.hadoop.io.BytesWritable";
+ private static boolean debugHCatExportMapper = false;
+ private MapWritable colTypesJava;
+ private MapWritable colTypesSql;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ colTypesJava = DefaultStringifier.load(conf,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
+ colTypesSql = DefaultStringifier.load(conf,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
+ // Instantiate a copy of the user's class to hold and parse the record.
+
+ String recordClassName = conf.get(
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+ if (null == recordClassName) {
+ throw new IOException("Export table class name ("
+ + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ + ") is not set!");
+ }
+ debugHCatExportMapper = conf.getBoolean(
+ SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
+ try {
+ Class cls = Class.forName(recordClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ if (null == sqoopRecord) {
+ throw new IOException("Could not instantiate object of type "
+ + recordClassName);
+ }
+
+ String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ jobInfo =
+ (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+ HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
+ HCatSchema partitionSchema =
+ jobInfo.getTableInfo().getPartitionColumns();
+ hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
+ for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+ hCatFullTableSchema.append(hfs);
+ }
+ hCatSchemaFields = hCatFullTableSchema.getFields();
+
+ }
+
+ @Override
+ public void map(WritableComparable key, HCatRecord value,
+ Context context)
+ throws IOException, InterruptedException {
+ context.write(convertToSqoopRecord(value), NullWritable.get());
+ }
+
+ private SqoopRecord convertToSqoopRecord(HCatRecord hcr)
+ throws IOException {
+ Text key = new Text();
+ for (Map.Entry<String, Object> e : sqoopRecord.getFieldMap().entrySet()) {
+ String colName = e.getKey();
+ String hfn = colName.toLowerCase();
+ key.set(hfn);
+ String javaColType = colTypesJava.get(key).toString();
+ int sqlType = ((IntWritable) colTypesSql.get(key)).get();
+ HCatFieldSchema field =
+ hCatFullTableSchema.get(hfn);
+ HCatFieldSchema.Type fieldType = field.getType();
+ Object hCatVal =
+ hcr.get(hfn, hCatFullTableSchema);
+ String hCatTypeString = field.getTypeString();
+ Object sqlVal = convertToSqoop(hCatVal, fieldType,
+ javaColType, hCatTypeString);
+ if (debugHCatExportMapper) {
+ LOG.debug("hCatVal " + hCatVal + " of type "
+ + (hCatVal == null ? null : hCatVal.getClass().getName())
+ + ",sqlVal " + sqlVal + " of type "
+ + (sqlVal == null ? null : sqlVal.getClass().getName())
+ + ",java type " + javaColType + ", sql type = "
+ + SqoopHCatUtilities.sqlTypeString(sqlType));
+ }
+ sqoopRecord.setField(colName, sqlVal);
+ }
+ return sqoopRecord;
+ }
+
+ private Object convertToSqoop(Object val,
+ HCatFieldSchema.Type fieldType, String javaColType,
+ String hCatTypeString) throws IOException {
+
+ if (val == null) {
+ return null;
+ }
+
+ switch (fieldType) {
+ case INT:
+ case TINYINT:
+ case SMALLINT:
+ case FLOAT:
+ case DOUBLE:
+ val = convertNumberTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BOOLEAN:
+ val = convertBooleanTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BIGINT:
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date((Long) val);
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time((Long) val);
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp((Long) val);
+ } else {
+ val = convertNumberTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ }
+ break;
+ case STRING:
+ val = convertStringTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BINARY:
+ val = convertBinaryTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new IOException("Cannot convert HCatalog type "
+ + fieldType);
+ }
+ LOG.error("Cannot convert HCatalog object of "
+ + " type " + hCatTypeString + " to java object type "
+ + javaColType);
+ return null;
+ }
+
+ private Object convertBinaryTypes(Object val, String javaColType) {
+ byte[] bb = (byte[]) val;
+ if (javaColType.equals(BYTESWRITABLE)) {
+ BytesWritable bw = new BytesWritable();
+ bw.set(bb, 0, bb.length);
+ return bw;
+ }
+ return null;
+ }
+
+ private Object convertStringTypes(Object val, String javaColType) {
+ String valStr = val.toString();
+ if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(valStr);
+ } else if (javaColType.equals(DATE_TYPE)
+ || javaColType.equals(TIME_TYPE)
+ || javaColType.equals(TIMESTAMP_TYPE)) {
+ // Oracle expects timestamps for Date also by default based on version
+ // Just allow all date types to be assignment compatible
+ if (valStr.length() == 10) { // Date in yyyy-mm-dd format
+ Date d = Date.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return d;
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time(d.getTime());
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp(d.getTime());
+ }
+ } else if (valStr.length() == 8) { // time in hh:mm:ss
+ Time t = Time.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date(t.getTime());
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return t;
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp(t.getTime());
+ }
+ } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
+ Timestamp ts = Timestamp.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date(ts.getTime());
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time(ts.getTime());
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return ts;
+ }
+ } else {
+ return null;
+ }
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return valStr;
+ } else if (javaColType.equals(BOOLEAN_TYPE)) {
+ return Boolean.valueOf(valStr);
+ } else if (javaColType.equals(BYTE_TYPE)) {
+ return Byte.parseByte(valStr);
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return Short.parseShort(valStr);
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return Integer.parseInt(valStr);
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return Long.parseLong(valStr);
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return Float.parseFloat(valStr);
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return Double.parseDouble(valStr);
+ }
+ return null;
+ }
+
+ private Object convertBooleanTypes(Object val, String javaColType) {
+ Boolean b = (Boolean) val;
+ if (javaColType.equals(BOOLEAN_TYPE)) {
+ return b;
+ } else if (javaColType.equals(BYTE_TYPE)) {
+ return (byte) (b ? 1 : 0);
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return (short) (b ? 1 : 0);
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return (int) (b ? 1 : 0);
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return (long) (b ? 1 : 0);
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return (float) (b ? 1 : 0);
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return (double) (b ? 1 : 0);
+ } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(b ? 1 : 0);
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return val.toString();
+ }
+ return null;
+ }
+
+ private Object convertNumberTypes(Object val, String javaColType) {
+ Number n = (Number) val;
+ if (javaColType.equals(BYTE_TYPE)) {
+ return n.byteValue();
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return n.shortValue();
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return n.intValue();
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return n.longValue();
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return n.floatValue();
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return n.doubleValue();
+ } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(n.doubleValue());
+ } else if (javaColType.equals(BOOLEAN_TYPE)) {
+ return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return n.toString();
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
new file mode 100644
index 0000000..4f0ff1b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+
+/**
+ * A mapper for HCatalog import.
+ */
+public class SqoopHCatImportMapper extends
+ SqoopMapper<WritableComparable, SqoopRecord,
+ WritableComparable, HCatRecord> {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatImportMapper.class.getName());
+
+ private static boolean debugHCatImportMapper = false;
+
+ private InputJobInfo jobInfo;
+ private HCatSchema hCatFullTableSchema;
+ private int fieldCount;
+ private boolean bigDecimalFormatString;
+ private LargeObjectLoader lobLoader;
+ private HCatSchema partitionSchema = null;
+ private HCatSchema dataColsSchema = null;
+ private String stringDelimiterReplacements = null;
+ private ArrayWritable delimCharsArray;
+ private String hiveDelimsReplacement;
+ private boolean doHiveDelimsReplacement = false;
+ private DelimiterSet hiveDelimiters;
+ private String staticPartitionKey;
+ private int[] hCatFieldPositions;
+ private int colCount;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ jobInfo =
+ (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+ dataColsSchema = jobInfo.getTableInfo().getDataColumns();
+ partitionSchema =
+ jobInfo.getTableInfo().getPartitionColumns();
+ StringBuilder storerInfoStr = new StringBuilder(1024);
+ StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
+ storerInfoStr.append("HCatalog Storer Info : ")
+ .append("\n\tHandler = ").append(storerInfo.getStorageHandlerClass())
+ .append("\n\tInput format class = ").append(storerInfo.getIfClass())
+ .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
+ .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
+ Properties storerProperties = storerInfo.getProperties();
+ if (!storerProperties.isEmpty()) {
+ storerInfoStr.append("\nStorer properties ");
+ for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
+ String key = (String) entry.getKey();
+ Object val = entry.getValue();
+ storerInfoStr.append("\n\t").append(key).append('=').append(val);
+ }
+ }
+ storerInfoStr.append("\n");
+ LOG.info(storerInfoStr);
+
+ hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
+ for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+ hCatFullTableSchema.append(hfs);
+ }
+ fieldCount = hCatFullTableSchema.size();
+ lobLoader = new LargeObjectLoader(conf,
+ new Path(jobInfo.getTableInfo().getTableLocation()));
+ bigDecimalFormatString = conf.getBoolean(
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+ debugHCatImportMapper = conf.getBoolean(
+ SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
+ IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
+ SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
+ hiveDelimiters = new DelimiterSet(
+ (char) delimChars[0].get(), (char) delimChars[1].get(),
+ (char) delimChars[2].get(), (char) delimChars[3].get(),
+ delimChars[4].get() == 1 ? true : false);
+ hiveDelimsReplacement =
+ conf.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
+ if (hiveDelimsReplacement == null) {
+ hiveDelimsReplacement = "";
+ }
+ doHiveDelimsReplacement = Boolean.valueOf(conf.get(
+ SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
+
+ IntWritable[] fPos = DefaultStringifier.loadArray(conf,
+ SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
+ hCatFieldPositions = new int[fPos.length];
+ for (int i = 0; i < fPos.length; ++i) {
+ hCatFieldPositions[i] = fPos[i].get();
+ }
+
+ LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
+ LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
+ LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
+ staticPartitionKey =
+ conf.get(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
+ LOG.debug("Static partition key used : " + staticPartitionKey);
+
+
+ }
+
+ @Override
+ public void map(WritableComparable key, SqoopRecord value,
+ Context context)
+ throws IOException, InterruptedException {
+
+ try {
+ // Loading of LOBs was delayed until we have a Context.
+ value.loadLargeObjects(lobLoader);
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+ if (colCount == -1) {
+ colCount = value.getFieldMap().size();
+ }
+ context.write(key, convertToHCatRecord(value));
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ if (null != lobLoader) {
+ lobLoader.close();
+ }
+ }
+
+ private HCatRecord convertToHCatRecord(SqoopRecord sqr)
+ throws IOException {
+ Map<String, Object> fieldMap = sqr.getFieldMap();
+ HCatRecord result = new DefaultHCatRecord(fieldCount);
+
+ for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String hfn = key.toLowerCase();
+ if (staticPartitionKey != null && staticPartitionKey.equals(hfn)) {
+ continue;
+ }
+ HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
+ if (debugHCatImportMapper) {
+ LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
+ + " of type " + (val == null ? null : val.getClass().getName())
+ + ", hcattype " + hfs.getTypeString());
+ }
+ Object hCatVal = toHCat(val, hfs.getType(), hfs.getTypeString());
+
+ result.set(hfn, hCatFullTableSchema, hCatVal);
+ }
+
+ return result;
+ }
+
+
+ private Object toHCat(Object val, HCatFieldSchema.Type hfsType,
+ String hCatTypeString) {
+
+ if (val == null) {
+ return null;
+ }
+
+ Object retVal = null;
+
+ if (val instanceof Number) {
+ retVal = convertNumberTypes(val, hfsType);
+ } else if (val instanceof Boolean) {
+ retVal = convertBooleanTypes(val, hfsType);
+ } else if (val instanceof String) {
+ if (hfsType == HCatFieldSchema.Type.STRING) {
+ String str = (String) val;
+ if (doHiveDelimsReplacement) {
+ retVal = FieldFormatter
+ .hiveStringReplaceDelims(str, hiveDelimsReplacement,
+ hiveDelimiters);
+ } else {
+ retVal = str;
+ }
+ }
+ } else if (val instanceof java.util.Date) {
+ retVal = converDateTypes(val, hfsType);
+ } else if (val instanceof BytesWritable) {
+ if (hfsType == HCatFieldSchema.Type.BINARY) {
+ BytesWritable bw = (BytesWritable) val;
+ retVal = bw.getBytes();
+ }
+ } else if (val instanceof BlobRef) {
+ if (hfsType == HCatFieldSchema.Type.BINARY) {
+ BlobRef br = (BlobRef) val;
+ byte[] bytes = br.isExternal() ? br.toString().getBytes()
+ : br.getData();
+ retVal = bytes;
+ }
+ } else if (val instanceof ClobRef) {
+ if (hfsType == HCatFieldSchema.Type.STRING) {
+ ClobRef cr = (ClobRef) val;
+ String s = cr.isExternal() ? cr.toString() : cr.getData();
+ retVal = s;
+ }
+ } else {
+ throw new UnsupportedOperationException("Objects of type "
+ + val.getClass().getName() + " are not suported");
+ }
+ if (retVal == null) {
+ LOG.error("Objects of type "
+ + val.getClass().getName() + " can not be mapped to HCatalog type "
+ + hCatTypeString);
+ }
+ return retVal;
+ }
+
+ private Object converDateTypes(Object val,
+ HCatFieldSchema.Type hfsType) {
+ if (val instanceof java.sql.Date) {
+ if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return ((Date) val).getTime();
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ } else if (val instanceof java.sql.Time) {
+ if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return ((Time) val).getTime();
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ } else if (val instanceof java.sql.Timestamp) {
+ if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return ((Timestamp) val).getTime();
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ }
+ return null;
+ }
+
+ private Object convertBooleanTypes(Object val,
+ HCatFieldSchema.Type hfsType) {
+ Boolean b = (Boolean) val;
+ if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+ return b;
+ } else if (hfsType == HCatFieldSchema.Type.TINYINT) {
+ return (byte) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+ return (short) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.INT) {
+ return (int) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return (long) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+ return (float) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+ return (double) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ return null;
+ }
+
+ private Object convertNumberTypes(Object val,
+ HCatFieldSchema.Type hfsType) {
+ if (!(val instanceof Number)) {
+ return null;
+ }
+ if (val instanceof BigDecimal && hfsType == HCatFieldSchema.Type.STRING) {
+ BigDecimal bd = (BigDecimal) val;
+ if (bigDecimalFormatString) {
+ return bd.toPlainString();
+ } else {
+ return bd.toString();
+ }
+ }
+ Number n = (Number) val;
+ if (hfsType == HCatFieldSchema.Type.TINYINT) {
+ return n.byteValue();
+ } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+ return n.shortValue();
+ } else if (hfsType == HCatFieldSchema.Type.INT) {
+ return n.intValue();
+ } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return n.longValue();
+ } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+ return n.floatValue();
+ } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+ return n.doubleValue();
+ } else if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+ return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return n.toString();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
new file mode 100644
index 0000000..5a2e48a
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * An abstraction of a combined HCatSplits.
+ *
+ */
+public class SqoopHCatInputSplit extends InputSplit implements Writable {
+ private List<HCatSplit> hCatSplits;
+ private String[] hCatLocations;
+ private long inputLength;
+
+ public SqoopHCatInputSplit() {
+ }
+
+ public SqoopHCatInputSplit(List<InputSplit> splits) {
+ hCatSplits = new ArrayList<HCatSplit>();
+ Set<String> locations = new HashSet<String>();
+ for (int i = 0; i < splits.size(); ++i) {
+ HCatSplit hsSplit = (HCatSplit) splits.get(i);
+ hCatSplits.add(hsSplit);
+ this.inputLength += hsSplit.getLength();
+ locations.addAll(Arrays.asList(hsSplit.getLocations()));
+ }
+ this.hCatLocations = locations.toArray(new String[0]);
+ }
+
+ public int length() {
+ return this.hCatSplits.size();
+ }
+
+ public HCatSplit get(int index) {
+ return this.hCatSplits.get(index);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ if (this.inputLength == 0L) {
+ for (HCatSplit split : this.hCatSplits) {
+ this.inputLength += split.getLength();
+ }
+ }
+ return this.inputLength;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ if (this.hCatLocations == null) {
+ Set<String> locations = new HashSet<String>();
+ for (HCatSplit split : this.hCatSplits) {
+ locations.addAll(Arrays.asList(split.getLocations()));
+ }
+ this.hCatLocations = locations.toArray(new String[0]);
+ }
+ return this.hCatLocations;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.inputLength);
+ out.writeInt(this.hCatSplits.size());
+ for (HCatSplit split : this.hCatSplits) {
+ split.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.inputLength = in.readLong();
+ int size = in.readInt();
+ this.hCatSplits = new ArrayList<HCatSplit>(size);
+ for (int i = 0; i < size; ++i) {
+ HCatSplit hs = new HCatSplit();
+ hs.readFields(in);
+ hCatSplits.add(hs);
+ }
+ }
+}
+
[2/3] SQOOP-931: Integrate HCatalog with Sqoop
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatRecordReader.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatRecordReader.java
new file mode 100644
index 0000000..55604f7
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatRecordReader.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ * A Record Reader that can combine underlying splits.
+ */
+public class SqoopHCatRecordReader extends
+ RecordReader<WritableComparable, HCatRecord> {
+ private final SqoopHCatExportFormat hCatExportFormat;
+ private SqoopHCatInputSplit hCatSplit;
+ private TaskAttemptContext context;
+ private int subIndex;
+ private long progress;
+
+ private RecordReader<WritableComparable, HCatRecord> curReader;
+
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatRecordReader.class.getName());
+
+ public SqoopHCatRecordReader(final InputSplit split,
+ final TaskAttemptContext context, final SqoopHCatExportFormat inputFormat)
+ throws IOException {
+ this.hCatSplit = (SqoopHCatInputSplit) split;
+ this.context = context;
+ this.subIndex = 0;
+ this.curReader = null;
+ this.progress = 0L;
+ this.hCatExportFormat = inputFormat;
+
+ initNextRecordReader();
+ }
+
+ @Override
+ public void initialize(final InputSplit split,
+ final TaskAttemptContext ctxt)
+ throws IOException, InterruptedException {
+ this.hCatSplit = (SqoopHCatInputSplit) split;
+ this.context = ctxt;
+
+ if (null != this.curReader) {
+ this.curReader.initialize(((SqoopHCatInputSplit) split)
+ .get(0), context);
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ while (this.curReader == null || !this.curReader.nextKeyValue()) {
+ if (!initNextRecordReader()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public WritableComparable getCurrentKey() throws IOException,
+ InterruptedException {
+ return this.curReader.getCurrentKey();
+ }
+
+ @Override
+ public HCatRecord getCurrentValue() throws IOException, InterruptedException {
+ return this.curReader.getCurrentValue();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.curReader != null) {
+ this.curReader.close();
+ this.curReader = null;
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ long subprogress = 0L;
+ if (null != this.curReader) {
+ subprogress = (long) (this.curReader.getProgress()
+ * this.hCatSplit.get(this.subIndex - 1).getLength());
+ }
+ // Indicate the total processed count.
+ return Math.min(1.0F, (this.progress + subprogress)
+ / (float) this.hCatSplit.getLength());
+ }
+
+ protected boolean initNextRecordReader() throws IOException {
+ if (this.curReader != null) {
+ // close current record reader if open
+ this.curReader.close();
+ this.curReader = null;
+ if (this.subIndex > 0) {
+ this.progress +=
+ this.hCatSplit.get(this.subIndex - 1).getLength();
+ }
+ LOG.debug("Closed current reader. Current progress = " + progress);
+ }
+
+ if (this.subIndex == this.hCatSplit.length()) {
+ LOG.debug("Done with all splits");
+ return false;
+ }
+
+ try {
+ // get a record reader for the subsplit-index chunk
+
+ this.curReader = this.hCatExportFormat.createHCatRecordReader(
+ this.hCatSplit.get(this.subIndex), this.context);
+
+ LOG.debug("Created a HCatRecordReader for split " + subIndex);
+ // initialize() for the first RecordReader will be called by MapTask;
+ // we're responsible for initializing subsequent RecordReaders.
+ if (this.subIndex > 0) {
+ this.curReader.initialize(this.hCatSplit.get(this.subIndex),
+ this.context);
+ LOG.info("Initialized reader with current split");
+ }
+ } catch (Exception e) {
+ throw new IOException("Error initializing HCat record reader", e);
+ }
+ LOG.debug("Created record reader for subsplit " + subIndex);
+ ++this.subIndex;
+ return true;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java
new file mode 100644
index 0000000..a109b40
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java
@@ -0,0 +1,1215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.apache.sqoop.hive.HiveTypes;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.LoggingAsyncSink;
+import org.apache.sqoop.util.SubprocessSecurityManager;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.util.ExitSecurityException;
+
+/**
+ * Utility methods for the HCatalog support for Sqoop.
+ */
+public final class SqoopHCatUtilities {
+ public static final String DEFHCATDB = "default";
+ public static final String HIVESITEXMLPATH = "/conf/hive-site.xml";
+ public static final String HCATSHAREDIR = "share/hcatalog";
+ public static final String DEFLIBDIR = "lib";
+ public static final String TEXT_FORMAT_IF_CLASS =
+ "org.apache.hadoop.mapred.TextInputFormat";
+ public static final String TEXT_FORMAT_OF_CLASS =
+ "org.apache.hadoop.mapred.TextOutputFormat";
+ public static final String TEXT_FORMAT_SERDE_CLASS =
+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ public static final String HCAT_DB_OUTPUT_COLTYPES_JAVA =
+ "sqoop.hcat.db.output.coltypes.java";
+ public static final String HCAT_DB_OUTPUT_COLTYPES_SQL =
+ "sqoop.hcat.db.output.coltypes.sql";
+ public static final String HCAT_CLI_MAIN_CLASS =
+ "org.apache.hcatalog.cli.HCatCli";
+ public static final String HCAT_DEF_STORAGE_STANZA = "stored as rcfile";
+ public static final String HIVE_DELIMITERS_TO_REPLACE_PROP =
+ "sqoop.hive.delims.to.replace";
+ public static final String HIVE_DELIMITERS_REPLACEMENT_PROP =
+ "sqoop.hive.delims.replacement";
+ public static final String HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP =
+ "sqoop.hive.delims.replacement.enabled";
+ public static final String HCAT_STATIC_PARTITION_KEY_PROP =
+ "sqoop.hcat.partition.key";
+ public static final String HCAT_FIELD_POSITIONS_PROP =
+ "sqoop.hcat.field.positions";
+ public static final String DEBUG_HCAT_IMPORT_MAPPER_PROP =
+ "sqoop.hcat.debug.import.mapper";
+ public static final String DEBUG_HCAT_EXPORT_MAPPER_PROP =
+ "sqoop.hcat.debug.export.mapper";
+ private static final String HCATCMD = Shell.WINDOWS ? "hcat.cmd" : "hcat";
+ private SqoopOptions options;
+ private ConnManager connManager;
+ private String hCatTableName;
+ private String hCatDatabaseName;
+ private Configuration configuration;
+ private Job hCatJob;
+ private HCatSchema hCatOutputSchema;
+ private HCatSchema hCatPartitionSchema;
+ private HCatSchema projectedSchema;
+ private boolean configured;
+
+ private String hCatQualifiedTableName;
+ private String hCatStaticPartitionKey;
+ private List<String> hCatDynamicPartitionKeys;
+ // DB stuff
+ private String[] dbColumnNames;
+ private String dbTableName;
+ private LCKeyMap<Integer> dbColumnTypes;
+
+ private Map<String, Integer> externalColTypes;
+
+ private int[] hCatFieldPositions; // For each DB column, HCat position
+
+ private HCatSchema hCatFullTableSchema;
+ private List<String> hCatFullTableSchemaFieldNames;
+ private LCKeyMap<String> userHiveMapping;
+
+ // For testing support
+ private static Class<? extends InputFormat> inputFormatClass =
+ SqoopHCatExportFormat.class;
+
+ private static Class<? extends OutputFormat> outputFormatClass =
+ HCatOutputFormat.class;
+
+ private static Class<? extends Mapper> exportMapperClass =
+ SqoopHCatExportMapper.class;
+
+ private static Class<? extends Mapper> importMapperClass =
+ SqoopHCatImportMapper.class;
+
+ private static Class<? extends Writable> importValueClass =
+ DefaultHCatRecord.class;
+
+ private static boolean testMode = false;
+
+ static class IntArrayWritable extends ArrayWritable {
+ public IntArrayWritable() {
+ super(IntWritable.class);
+ }
+ }
+
+ /**
+ * A Map using String as key type that ignores case of its key and stores the
+ * key in lower case.
+ */
+ private static class LCKeyMap<V> extends HashMap<String, V> {
+
+ private static final long serialVersionUID = -6751510232323094216L;
+
+ @Override
+ public V put(String key, V value) {
+ return super.put(key.toLowerCase(), value);
+ }
+
+ @Override
+ public V get(Object key) {
+ return super.get(((String) key).toLowerCase());
+ }
+ }
+
+ /**
+ * A Map using String as key type that ignores case of its key and stores the
+ * key in upper case.
+ */
+ public class UCKeyMap<V> extends HashMap<String, V> {
+
+ private static final long serialVersionUID = -6751510232323094216L;
+
+ @Override
+ public V put(String key, V value) {
+ return super.put(key.toUpperCase(), value);
+ }
+
+ @Override
+ public V get(Object key) {
+ return super.get(((String) key).toUpperCase());
+ }
+ }
+
+ /**
+ * A class to hold the instance. For guaranteeing singleton creation using JMM
+ * semantics.
+ */
+ public static final class Holder {
+ @SuppressWarnings("synthetic-access")
+ public static final SqoopHCatUtilities INSTANCE = new SqoopHCatUtilities();
+
+ private Holder() {
+ }
+ }
+
+ public static SqoopHCatUtilities instance() {
+ return Holder.INSTANCE;
+ }
+
+ private SqoopHCatUtilities() {
+ configured = false;
+ }
+
+ public static final Log LOG = LogFactory.getLog(SqoopHCatUtilities.class
+ .getName());
+
+ public boolean isConfigured() {
+ return configured;
+ }
+
+ public void configureHCat(final SqoopOptions opts, final Job job,
+ final ConnManager connMgr, final String dbTable,
+ final Configuration config) throws IOException {
+ if (configured) {
+ LOG.info("Ignoring configuration request for HCatalog info");
+ return;
+ }
+ options = opts;
+
+ LOG.info("Configuring HCatalog specific details for job");
+
+ String home = opts.getHiveHome();
+
+ if (home == null || home.length() == 0) {
+ LOG.warn("Hive home is not set. job may fail if needed jar files "
+ + "are not found correctly. Please set HIVE_HOME in"
+ + " sqoop-env.sh or provide --hive-home option. Setting HIVE_HOME "
+ + " to " + SqoopOptions.getHiveHomeDefault());
+ }
+
+ home = opts.getHCatHome();
+ if (home == null || home.length() == 0) {
+ LOG.warn("HCatalog home is not set. job may fail if needed jar "
+ + "files are not found correctly. Please set HCAT_HOME in"
+ + " sqoop-env.sh or provide --hcatalog-home option. "
+ + " Setting HCAT_HOME to " + SqoopOptions.getHCatHomeDefault());
+ }
+ connManager = connMgr;
+ dbTableName = dbTable;
+ configuration = config;
+ hCatJob = job;
+ hCatDatabaseName = options.getHCatDatabaseName() != null ? options
+ .getHCatDatabaseName() : DEFHCATDB;
+ hCatDatabaseName = hCatDatabaseName.toLowerCase();
+
+ String optHCTabName = options.getHCatTableName();
+ hCatTableName = optHCTabName.toLowerCase();
+
+ if (!hCatTableName.equals(optHCTabName)) {
+ LOG.warn("Provided HCatalog table name " + optHCTabName
+ + " will be mapped to " + hCatTableName);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(hCatDatabaseName);
+ sb.append('.').append(hCatTableName);
+ hCatQualifiedTableName = sb.toString();
+
+ String principalID = System
+ .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+ if (principalID != null) {
+ configuration.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
+ }
+ hCatStaticPartitionKey = options.getHivePartitionKey();
+
+ Properties userMapping = options.getMapColumnHive();
+ userHiveMapping = new LCKeyMap<String>();
+ for (Object o : userMapping.keySet()) {
+ String v = (String) userMapping.get(o);
+ userHiveMapping.put((String) o, v);
+ }
+ // Get the partition key filter if needed
+ Map<String, String> filterMap = getHCatSPFilterMap();
+ String filterStr = getHCatSPFilterStr();
+ initDBColumnNamesAndTypes();
+ if (options.doCreateHCatalogTable()) {
+ LOG.info("Creating HCatalog table " + hCatQualifiedTableName
+ + " for import");
+ createHCatTable();
+ }
+ // For serializing the schema to conf
+ HCatInputFormat hif = HCatInputFormat.setInput(hCatJob, hCatDatabaseName,
+ hCatTableName);
+ // For serializing the schema to conf
+ if (filterStr != null) {
+ LOG.info("Setting hCatInputFormat filter to " + filterStr);
+ hif.setFilter(filterStr);
+ }
+
+ hCatFullTableSchema = HCatInputFormat.getTableSchema(configuration);
+ hCatFullTableSchemaFieldNames = hCatFullTableSchema.getFieldNames();
+
+ LOG.info("HCatalog full table schema fields = "
+ + Arrays.toString(hCatFullTableSchema.getFieldNames().toArray()));
+
+ if (filterMap != null) {
+ LOG.info("Setting hCatOutputFormat filter to " + filterStr);
+ }
+
+ HCatOutputFormat.setOutput(hCatJob,
+ OutputJobInfo.create(hCatDatabaseName, hCatTableName, filterMap));
+ hCatOutputSchema = HCatOutputFormat.getTableSchema(configuration);
+ List<HCatFieldSchema> hCatPartitionSchemaFields =
+ new ArrayList<HCatFieldSchema>();
+ int totalFieldsCount = hCatFullTableSchema.size();
+ int dataFieldsCount = hCatOutputSchema.size();
+ if (totalFieldsCount > dataFieldsCount) {
+ for (int i = dataFieldsCount; i < totalFieldsCount; ++i) {
+ hCatPartitionSchemaFields.add(hCatFullTableSchema.get(i));
+ }
+ }
+
+ hCatPartitionSchema = new HCatSchema(hCatPartitionSchemaFields);
+ for (HCatFieldSchema hfs : hCatPartitionSchemaFields) {
+ if (hfs.getType() != HCatFieldSchema.Type.STRING) {
+ throw new IOException("The table provided "
+ + getQualifiedHCatTableName()
+ + " uses unsupported partitioning key type for column "
+ + hfs.getName() + " : " + hfs.getTypeString() + ". Only string "
+ + "fields are allowed in partition columns in HCatalog");
+ }
+
+ }
+ LOG.info("HCatalog table partitioning key fields = "
+ + Arrays.toString(hCatPartitionSchema.getFieldNames().toArray()));
+
+ List<HCatFieldSchema> outputFieldList = new ArrayList<HCatFieldSchema>();
+ for (String col : dbColumnNames) {
+ HCatFieldSchema hfs = hCatFullTableSchema.get(col);
+ if (hfs == null) {
+ throw new IOException("Database column " + col + " not found in "
+ + " hcatalog table.");
+ }
+ if (hCatStaticPartitionKey != null
+ && hCatStaticPartitionKey.equals(col)) {
+ continue;
+ }
+ outputFieldList.add(hCatFullTableSchema.get(col));
+ }
+
+ projectedSchema = new HCatSchema(outputFieldList);
+
+ LOG.info("HCatalog projected schema fields = "
+ + Arrays.toString(projectedSchema.getFieldNames().toArray()));
+
+ validateStaticPartitionKey();
+ validateHCatTableFieldTypes();
+
+ HCatOutputFormat.setSchema(configuration, hCatFullTableSchema);
+
+ addJars(hCatJob, options);
+ config.setBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP,
+ Boolean.getBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP));
+ config.setBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP,
+ Boolean.getBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP));
+ configured = true;
+ }
+
+ public void validateDynamicPartitionKeysMapping() throws IOException {
+ // Now validate all partition columns are in the database column list
+ StringBuilder missingKeys = new StringBuilder();
+
+ for (String s : hCatDynamicPartitionKeys) {
+ boolean found = false;
+ for (String c : dbColumnNames) {
+ if (s.equals(c)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ missingKeys.append(',').append(s);
+ }
+ }
+ if (missingKeys.length() > 0) {
+ throw new IOException("Dynamic partition keys are not "
+ + "present in the database columns. Missing keys = "
+ + missingKeys.substring(1));
+ }
+ }
+
+ public void validateHCatTableFieldTypes() throws IOException {
+ StringBuilder sb = new StringBuilder();
+ boolean hasComplexFields = false;
+ for (HCatFieldSchema hfs : projectedSchema.getFields()) {
+ if (hfs.isComplex()) {
+ sb.append('.').append(hfs.getName());
+ hasComplexFields = true;
+ }
+ }
+
+ if (hasComplexFields) {
+ String unsupportedFields = sb.substring(1);
+ throw new IOException("The HCatalog table provided "
+ + getQualifiedHCatTableName() + " has complex field types ("
+ + unsupportedFields + "). They are currently not supported");
+ }
+
+ }
+
+ /**
+ * Get the column names to import.
+ */
+ private void initDBColumnNamesAndTypes() throws IOException {
+ String[] colNames = options.getColumns();
+ if (null == colNames) {
+ if (null != externalColTypes) {
+ // Test-injection column mapping. Extract the col names from
+ ArrayList<String> keyList = new ArrayList<String>();
+ for (String key : externalColTypes.keySet()) {
+ keyList.add(key);
+ }
+ colNames = keyList.toArray(new String[keyList.size()]);
+ } else if (null != dbTableName) {
+ colNames = connManager.getColumnNames(dbTableName);
+ } else if (options.getCall() != null) {
+ // Read procedure arguments from metadata
+ colNames = connManager.getColumnNamesForProcedure(this.options
+ .getCall());
+ } else {
+ colNames = connManager.getColumnNamesForQuery(options.getSqlQuery());
+ }
+ }
+
+ dbColumnNames = new String[colNames.length];
+
+ for (int i = 0; i < colNames.length; ++i) {
+ dbColumnNames[i] = colNames[i].toLowerCase();
+ }
+
+ LCKeyMap<Integer> colTypes = new LCKeyMap<Integer>();
+ if (externalColTypes != null) { // Use pre-defined column types.
+ colTypes.putAll(externalColTypes);
+ } else { // Get these from the database.
+ if (dbTableName != null) {
+ colTypes.putAll(connManager.getColumnTypes(dbTableName));
+ } else if (options.getCall() != null) {
+ // Read procedure arguments from metadata
+ colTypes.putAll(connManager.getColumnTypesForProcedure(this.options
+ .getCall()));
+ } else {
+ colTypes.putAll(connManager.getColumnTypesForQuery(options
+ .getSqlQuery()));
+ }
+ }
+
+ if (options.getColumns() == null) {
+ dbColumnTypes = colTypes;
+ } else {
+ dbColumnTypes = new LCKeyMap<Integer>();
+ // prune column types based on projection
+ for (String col : dbColumnNames) {
+ Integer type = colTypes.get(col);
+ if (type == null) {
+ throw new IOException("Projected column " + col
+ + " not in list of columns from database");
+ }
+ dbColumnTypes.put(col, type);
+ }
+ }
+ LOG.info("Database column names projected : "
+ + Arrays.toString(dbColumnNames));
+ LOG.info("Database column name - type map :\n\tNames: "
+ + Arrays.toString(dbColumnTypes.keySet().toArray()) + "\n\tTypes : "
+ + Arrays.toString(dbColumnTypes.values().toArray()));
+ }
+
+ private void createHCatTable() throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("create table ").
+ append(hCatDatabaseName).append('.');
+ sb.append(hCatTableName).append(" (\n\t");
+ boolean first = true;
+ for (String col : dbColumnNames) {
+ String type = userHiveMapping.get(col);
+ if (type == null) {
+ type = connManager.toHCatType(dbColumnTypes.get(col));
+ }
+ if (hCatStaticPartitionKey != null
+ && col.equals(hCatStaticPartitionKey)) {
+ continue;
+ }
+ if (first) {
+ first = false;
+ } else {
+ sb.append(",\n\t");
+ }
+ sb.append(col).append(' ').append(type);
+ }
+ sb.append(")\n");
+ if (hCatStaticPartitionKey != null) {
+ sb.append("partitioned by (\n\t");
+ sb.append(hCatStaticPartitionKey).append(" string)\n");
+ }
+ String storageStanza = options.getHCatStorageStanza();
+ if (storageStanza == null) {
+ sb.append(HCAT_DEF_STORAGE_STANZA);
+ } else {
+ sb.append(storageStanza);
+ }
+ String createStatement = sb.toString();
+ LOG.info("HCatalog Create table statement: \n\n" + createStatement);
+ // Always launch as an external program so that logging is not messed
+ // up by the use of inline hive CLI except in tests
+ // We prefer external HCAT client.
+ launchHCatCli(createStatement);
+ }
+
+
+ private void validateFieldAndColumnMappings() throws IOException {
+ // Check that all explicitly mapped columns are present
+ for (Object column : userHiveMapping.keySet()) {
+ boolean found = false;
+ for (String c : dbColumnNames) {
+ if (c.equalsIgnoreCase((String) column)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException("Column " + column
+ + " not found while mapping database columns to hcatalog columns");
+ }
+ }
+
+ hCatFieldPositions = new int[dbColumnNames.length];
+
+ Arrays.fill(hCatFieldPositions, -1);
+
+ for (int indx = 0; indx < dbColumnNames.length; ++indx) {
+ boolean userMapped = false;
+ String col = dbColumnNames[indx];
+ Integer colType = dbColumnTypes.get(col);
+ String hCatColType = userHiveMapping.get(col);
+ if (hCatColType == null) {
+ LOG.debug("No user defined type mapping for HCatalog field " + col);
+ hCatColType = connManager.toHCatType(colType);
+ } else {
+ LOG.debug("Found type mapping for HCatalog filed " + col);
+ userMapped = true;
+ }
+ if (null == hCatColType) {
+ throw new IOException("HCat does not support the SQL type for column "
+ + col);
+ }
+
+ boolean found = false;
+ for (String tf : hCatFullTableSchemaFieldNames) {
+ if (tf.equals(col)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new IOException("Database column " + col + " not found in "
+ + "hcatalog table schema or partition schema");
+ }
+ if (!userMapped) {
+ HCatFieldSchema hCatFS = hCatFullTableSchema.get(col);
+ if (!hCatFS.getTypeString().equals(hCatColType)) {
+ LOG.warn("The HCatalog field " + col + " has type "
+ + hCatFS.getTypeString() + ". Expected = " + hCatColType
+ + " based on database column type : " + sqlTypeString(colType));
+ LOG.warn("The Sqoop job can fail if types are not "
+ + " assignment compatible");
+ }
+ }
+
+ if (HiveTypes.isHiveTypeImprovised(colType)) {
+ LOG.warn("Column " + col + " had to be cast to a less precise type "
+ + hCatColType + " in hcatalog");
+ }
+ hCatFieldPositions[indx] = hCatFullTableSchemaFieldNames.indexOf(col);
+ if (hCatFieldPositions[indx] < 0) {
+ throw new IOException("The HCatalog field " + col
+ + " could not be found");
+ }
+ }
+
+ IntWritable[] positions = new IntWritable[hCatFieldPositions.length];
+ for (int i : hCatFieldPositions) {
+ positions[i] = new IntWritable(hCatFieldPositions[i]);
+ }
+
+ DefaultStringifier.storeArray(configuration, positions,
+ HCAT_FIELD_POSITIONS_PROP);
+ }
+
+ private String getHCatSPFilterStr() {
+ if (hCatStaticPartitionKey != null) {
+ StringBuilder filter = new StringBuilder();
+ filter.append(options.getHivePartitionKey()).append('=').append('\'')
+ .append(options.getHivePartitionValue()).append('\'');
+ return filter.toString();
+ }
+ return null;
+ }
+
+ private Map<String, String> getHCatSPFilterMap() {
+ if (hCatStaticPartitionKey != null) {
+ Map<String, String> filter = new HashMap<String, String>();
+ filter
+ .put(options.getHivePartitionKey(), options.getHivePartitionValue());
+ return filter;
+ }
+ return null;
+ }
+
+ private void validateStaticPartitionKey() throws IOException {
+ // check the static partition key from command line
+ List<HCatFieldSchema> partFields = hCatPartitionSchema.getFields();
+
+ if (hCatStaticPartitionKey != null) {
+ boolean found = false;
+ for (HCatFieldSchema hfs : partFields) {
+ if (hfs.getName().equals(hCatStaticPartitionKey)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new IOException("The provided hive partition key "
+ + hCatStaticPartitionKey + " is not part of the partition "
+ + " keys for table " + getQualifiedHCatTableName());
+ }
+ }
+ hCatDynamicPartitionKeys = new ArrayList<String>();
+ hCatDynamicPartitionKeys.addAll(hCatPartitionSchema.getFieldNames());
+ if (hCatStaticPartitionKey != null) {
+ hCatDynamicPartitionKeys.remove(hCatStaticPartitionKey);
+ }
+ configuration.set(HCAT_STATIC_PARTITION_KEY_PROP,
+ hCatStaticPartitionKey == null ? "" : hCatStaticPartitionKey);
+ }
+
+ public static void configureImportOutputFormat(SqoopOptions opts, Job job,
+ ConnManager connMgr, String dbTable, Configuration config)
+ throws IOException {
+
+ LOG.info("Configuring HCatalog for import job");
+ SqoopHCatUtilities.instance().configureHCat(opts, job, connMgr, dbTable,
+ job.getConfiguration());
+ LOG.info("Validating dynamic partition keys");
+ SqoopHCatUtilities.instance().validateFieldAndColumnMappings();
+ SqoopHCatUtilities.instance().validateDynamicPartitionKeysMapping();
+ job.setOutputFormatClass(getOutputFormatClass());
+ IntWritable[] delimChars = new IntWritable[5];
+ String hiveReplacement = "";
+ LOG.debug("Hive delimiters will be fixed during import");
+ DelimiterSet delims = opts.getOutputDelimiters();
+ if (!opts.explicitOutputDelims()) {
+ delims = DelimiterSet.HIVE_DELIMITERS;
+ }
+ delimChars = new IntWritable[] {
+ new IntWritable(delims.getFieldsTerminatedBy()),
+ new IntWritable(delims.getLinesTerminatedBy()),
+ new IntWritable(delims.getEnclosedBy()),
+ new IntWritable(delims.getEscapedBy()),
+ new IntWritable(delims.isEncloseRequired() ? 1 : 0), };
+ hiveReplacement = opts.getHiveDelimsReplacement();
+ if (hiveReplacement == null) {
+ hiveReplacement = "";
+ }
+
+ LOG.debug("Setting hive delimiters information");
+ DefaultStringifier.storeArray(config, delimChars,
+ HIVE_DELIMITERS_TO_REPLACE_PROP);
+ config.set(HIVE_DELIMITERS_REPLACEMENT_PROP, hiveReplacement);
+ if (opts.doHiveDropDelims() || opts.getHiveDelimsReplacement() != null) {
+ LOG.debug("Enabling hive delimter replacement");
+ config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "true");
+ } else {
+ LOG.debug("Disabling hive delimter replacement");
+ config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "false");
+ }
+ }
+
+ public static void configureExportInputFormat(SqoopOptions opts, Job job,
+ ConnManager connMgr, String dbTable, Configuration config)
+ throws IOException {
+
+ LOG.info("Configuring HCatalog for export job");
+ SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
+ hCatUtils
+ .configureHCat(opts, job, connMgr, dbTable, job.getConfiguration());
+ job.setInputFormatClass(getInputFormatClass());
+ Map<String, Integer> dbColTypes = hCatUtils.getDbColumnTypes();
+ MapWritable columnTypesJava = new MapWritable();
+ for (Map.Entry<String, Integer> e : dbColTypes.entrySet()) {
+ Text columnName = new Text(e.getKey());
+ Text columnText = new Text(connMgr.toJavaType(dbTable, e.getKey(),
+ e.getValue()));
+ columnTypesJava.put(columnName, columnText);
+ }
+ MapWritable columnTypesSql = new MapWritable();
+ for (Map.Entry<String, Integer> e : dbColTypes.entrySet()) {
+ Text columnName = new Text(e.getKey());
+ IntWritable sqlType = new IntWritable(e.getValue());
+ columnTypesSql.put(columnName, sqlType);
+ }
+ DefaultStringifier.store(config, columnTypesJava,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA);
+ DefaultStringifier.store(config, columnTypesSql,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL);
+ }
+
+ /**
+ * Add the Hive and HCatalog jar files to local classpath and dist cache.
+ * @throws IOException
+ */
+ public static void addJars(Job job, SqoopOptions options) throws IOException {
+
+ if (isLocalJobTracker(job)) {
+ LOG.info("Not adding hcatalog jars to distributed cache in local mode");
+ return;
+ }
+ Configuration conf = job.getConfiguration();
+ String hiveHome = null;
+ String hCatHome = null;
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (options != null) {
+ hiveHome = options.getHiveHome();
+ }
+ if (hiveHome == null) {
+ hiveHome = SqoopOptions.getHiveHomeDefault();
+ }
+ if (options != null) {
+ hCatHome = options.getHCatHome();
+ }
+ if (hCatHome == null) {
+ hCatHome = SqoopOptions.getHCatHomeDefault();
+ }
+ LOG.info("HCatalog job : Hive Home = " + hiveHome);
+ LOG.info("HCatalog job: HCatalog Home = " + hCatHome);
+
+ conf.addResource(hiveHome + HIVESITEXMLPATH);
+
+ // Add these to the 'tmpjars' array, which the MR JobSubmitter
+ // will upload to HDFS and put in the DistributedCache libjars.
+ List<String> libDirs = new ArrayList<String>();
+ libDirs.add(hCatHome + File.separator + HCATSHAREDIR);
+ libDirs.add(hCatHome + File.separator + DEFLIBDIR);
+ libDirs.add(hiveHome + File.separator + DEFLIBDIR);
+ Set<String> localUrls = new HashSet<String>();
+ // Add any libjars already specified
+ localUrls
+ .addAll(conf
+ .getStringCollection(
+ ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM));
+ for (String dir : libDirs) {
+ LOG.info("Adding jar files under " + dir + " to distributed cache");
+ addDirToCache(new File(dir), fs, localUrls, false);
+ }
+
+ // Recursively add all hcatalog storage handler jars
+ // The HBase storage handler is getting deprecated post Hive+HCat merge
+ String hCatStorageHandlerDir = hCatHome + File.separator
+ + "share/hcatalog/storage-handlers";
+ LOG.info("Adding jar files under " + hCatStorageHandlerDir
+ + " to distributed cache (recursively)");
+
+ addDirToCache(new File(hCatStorageHandlerDir), fs, localUrls, true);
+
+ String tmpjars = conf
+ .get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
+ StringBuilder sb = new StringBuilder(1024);
+ if (null != tmpjars) {
+ sb.append(tmpjars);
+ sb.append(",");
+ }
+ sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
+ conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, sb.toString());
+ }
+
+ /**
+ * Add the .jar elements of a directory to the DCache classpath, optionally
+ * recursively.
+ */
+ private static void addDirToCache(File dir, FileSystem fs,
+ Set<String> localUrls, boolean recursive) {
+ if (dir == null) {
+ return;
+ }
+
+ File[] fileList = dir.listFiles();
+
+ if (fileList == null) {
+ LOG.warn("No files under " + dir
+ + " to add to distributed cache for hcatalog job");
+ return;
+ }
+
+ for (File libFile : dir.listFiles()) {
+ if (libFile.exists() && !libFile.isDirectory()
+ && libFile.getName().endsWith("jar")) {
+ Path p = new Path(libFile.toString());
+ if (libFile.canRead()) {
+ String qualified = p.makeQualified(fs).toString();
+ LOG.info("Adding to job classpath: " + qualified);
+ localUrls.add(qualified);
+ } else {
+ LOG.warn("Ignoring unreadable file " + libFile);
+ }
+ }
+ if (recursive && libFile.isDirectory()) {
+ addDirToCache(libFile, fs, localUrls, recursive);
+ }
+ }
+ }
+
+ public static boolean isHadoop1() {
+ String version = org.apache.hadoop.util.VersionInfo.getVersion();
+ if (version.matches("\\b0\\.20\\..+\\b")
+ || version.matches("\\b1\\.\\d\\.\\d")) {
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean isLocalJobTracker(Job job) {
+ Configuration conf = job.getConfiguration();
+ // If framework is set to YARN, then we can't be running in local mode
+ if ("yarn".equalsIgnoreCase(conf
+ .get(ConfigurationConstants.PROP_MAPREDUCE_FRAMEWORK_NAME))) {
+ return false;
+ }
+ String jtAddr = conf
+ .get(ConfigurationConstants.PROP_MAPRED_JOB_TRACKER_ADDRESS);
+ String jtAddr2 = conf
+ .get(ConfigurationConstants.PROP_MAPREDUCE_JOB_TRACKER_ADDRESS);
+ return (jtAddr != null && jtAddr.equals("local"))
+ || (jtAddr2 != null && jtAddr2.equals("local"));
+ }
+
+ public void invokeOutputCommitterForLocalMode(Job job) throws IOException {
+ if (isLocalJobTracker(job) && isHadoop1()) {
+ LOG.info("Explicitly committing job in local mode");
+ HCatHadoopShims.Instance.get().commitJob(new HCatOutputFormat(), job);
+ }
+ }
+
+ public void launchHCatCli(String cmdLine)
+ throws IOException {
+ String tmpFileName = null;
+
+
+ String tmpDir = System.getProperty("java.io.tmpdir");
+ if (options != null) {
+ tmpDir = options.getTempDir();
+ }
+ tmpFileName =
+ new File(tmpDir, "hcat-script-"
+ + System.currentTimeMillis()).getAbsolutePath();
+
+ writeHCatScriptFile(tmpFileName, cmdLine);
+ // Create the argv for the HCatalog Cli Driver.
+ String[] argArray = new String[2];
+ argArray[0] = "-f";
+ argArray[1] = tmpFileName;
+ String argLine = StringUtils.join(",", Arrays.asList(argArray));
+
+ if (testMode) {
+ LOG.debug("Executing HCatalog CLI in-process with " + argLine);
+ executeHCatProgramInProcess(argArray);
+ } else {
+ LOG.info("Executing external HCatalog CLI process with args :" + argLine);
+ executeExternalHCatProgram(Executor.getCurEnvpStrings(), argArray);
+ }
+ }
+
+ public void writeHCatScriptFile(String fileName, String contents)
+ throws IOException {
+ BufferedWriter w = null;
+ try {
+ FileOutputStream fos = new FileOutputStream(fileName);
+ w = new BufferedWriter(new OutputStreamWriter(fos));
+ w.write(contents, 0, contents.length());
+ } catch (IOException ioe) {
+ LOG.error("Error writing HCatalog load-in script", ioe);
+ throw ioe;
+ } finally {
+ if (null != w) {
+ try {
+ w.close();
+ } catch (IOException ioe) {
+ LOG.warn("IOException closing stream to HCatalog script", ioe);
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute HCat via an external 'bin/hcat' process.
+ * @param env
+ * the environment strings to pass to any subprocess.
+ * @throws IOException
+ * if HCatalog did not exit successfully.
+ */
+ public void executeExternalHCatProgram(List<String> env, String[] cmdLine)
+ throws IOException {
+ // run HCat command with the given args
+ String hCatProgram = getHCatPath();
+ ArrayList<String> args = new ArrayList<String>();
+ args.add(hCatProgram);
+ if (cmdLine != null && cmdLine.length > 0) {
+ for (String s : cmdLine) {
+ args.add(s);
+ }
+ }
+ LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+ int ret = Executor.exec(args.toArray(new String[0]),
+ env.toArray(new String[0]), logSink, logSink);
+ if (0 != ret) {
+ throw new IOException("HCat exited with status " + ret);
+ }
+ }
+
+ public void executeHCatProgramInProcess(String[] argv) throws IOException {
+ SubprocessSecurityManager subprocessSM = null;
+
+ try {
+ Class<?> cliDriverClass = Class.forName(HCAT_CLI_MAIN_CLASS);
+ subprocessSM = new SubprocessSecurityManager();
+ subprocessSM.install();
+ Method mainMethod = cliDriverClass.getMethod("main", argv.getClass());
+ mainMethod.invoke(null, (Object) argv);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("HCatalog class not found", cnfe);
+ } catch (NoSuchMethodException nsme) {
+ throw new IOException("Could not access HCatCli.main()", nsme);
+ } catch (IllegalAccessException iae) {
+ throw new IOException("Could not access HatCli.main()", iae);
+ } catch (InvocationTargetException ite) {
+ // This may have been the ExitSecurityException triggered by the
+ // SubprocessSecurityManager.
+ Throwable cause = ite.getCause();
+ if (cause instanceof ExitSecurityException) {
+ ExitSecurityException ese = (ExitSecurityException) cause;
+ int status = ese.getExitStatus();
+ if (status != 0) {
+ throw new IOException("HCatCli exited with status=" + status);
+ }
+ } else {
+ throw new IOException("Exception thrown from HCatCli", ite);
+ }
+ } finally {
+ if (null != subprocessSM) {
+ subprocessSM.uninstall();
+ }
+ }
+ }
+
+ /**
+ * @return the filename of the hcat executable to run to do the import
+ */
+ public String getHCatPath() {
+ String hCatHome = null;
+ if (options == null) {
+ hCatHome = SqoopOptions.getHCatHomeDefault();
+ } else {
+ hCatHome = options.getHCatHome();
+ }
+
+ if (null == hCatHome) {
+ return null;
+ }
+
+ Path p = new Path(hCatHome);
+ p = new Path(p, "bin");
+ p = new Path(p, HCATCMD);
+ String hCatBinStr = p.toString();
+ if (new File(hCatBinStr).canExecute()) {
+ return hCatBinStr;
+ } else {
+ return null;
+ }
+ }
+
+ public static boolean isTestMode() {
+ return testMode;
+ }
+
+ public static void setTestMode(boolean mode) {
+ testMode = mode;
+ }
+
+ public static Class<? extends InputFormat> getInputFormatClass() {
+ return inputFormatClass;
+ }
+
+ public static Class<? extends OutputFormat> getOutputFormatClass() {
+ return outputFormatClass;
+ }
+
+ public static void setInputFormatClass(Class<? extends InputFormat> clz) {
+ inputFormatClass = clz;
+ }
+
+ public static void setOutputFormatClass(Class<? extends OutputFormat> clz) {
+ outputFormatClass = clz;
+ }
+
+ public static Class<? extends Mapper> getImportMapperClass() {
+ return importMapperClass;
+ }
+
+ public static Class<? extends Mapper> getExportMapperClass() {
+ return exportMapperClass;
+ }
+
+ public static void setExportMapperClass(Class<? extends Mapper> clz) {
+ exportMapperClass = clz;
+ }
+
+ public static void setImportMapperClass(Class<? extends Mapper> clz) {
+ importMapperClass = clz;
+ }
+
+ public static Class<? extends Writable> getImportValueClass() {
+ return importValueClass;
+ }
+
+ public static void setImportValueClass(Class<? extends Writable> clz) {
+ importValueClass = clz;
+ }
+
+ /**
+ * Set the column type map to be used. (dependency injection for testing; not
+ * used in production.)
+ */
+ public void setColumnTypes(Map<String, Integer> colTypes) {
+ externalColTypes = colTypes;
+ LOG.debug("Using test-controlled type map");
+ }
+
+ public String getDatabaseTable() {
+ return dbTableName;
+ }
+
+ public String getHCatTableName() {
+ return hCatTableName;
+ }
+
+ public String getHCatDatabaseName() {
+ return hCatDatabaseName;
+ }
+
+ public String getQualifiedHCatTableName() {
+ return hCatQualifiedTableName;
+ }
+
+ public List<String> getHCatDynamicPartitionKeys() {
+ return hCatDynamicPartitionKeys;
+ }
+
+ public String getHCatStaticPartitionKey() {
+ return hCatStaticPartitionKey;
+ }
+
+ public String[] getDBColumnNames() {
+ return dbColumnNames;
+ }
+
+ public HCatSchema getHCatOutputSchema() {
+ return hCatOutputSchema;
+ }
+
+ public void setHCatOutputSchema(HCatSchema schema) {
+ hCatOutputSchema = schema;
+ }
+
+ public HCatSchema getHCatPartitionSchema() {
+ return hCatPartitionSchema;
+ }
+
+ public void setHCatPartitionSchema(HCatSchema schema) {
+ hCatPartitionSchema = schema;
+ }
+
+ public void setHCatStaticPartitionKey(String key) {
+ hCatStaticPartitionKey = key;
+ }
+
+ public void setHCatDynamicPartitionKeys(List<String> keys) {
+ hCatDynamicPartitionKeys = keys;
+ }
+
+ public String[] getDbColumnNames() {
+ return dbColumnNames;
+ }
+
+ public void setDbColumnNames(String[] names) {
+ dbColumnNames = names;
+ }
+
+ public Map<String, Integer> getDbColumnTypes() {
+ return dbColumnTypes;
+ }
+
+ public void setDbColumnTypes(Map<String, Integer> types) {
+ dbColumnTypes.putAll(types);
+ }
+
+ public String gethCatTableName() {
+ return hCatTableName;
+ }
+
+ public String gethCatDatabaseName() {
+ return hCatDatabaseName;
+ }
+
+ public String gethCatQualifiedTableName() {
+ return hCatQualifiedTableName;
+ }
+
+ public void setConfigured(boolean value) {
+ configured = value;
+ }
+
+ public static String sqlTypeString(int sqlType) {
+ switch (sqlType) {
+ case Types.BIT:
+ return "BIT";
+ case Types.TINYINT:
+ return "TINYINT";
+ case Types.SMALLINT:
+ return "SMALLINT";
+ case Types.INTEGER:
+ return "INTEGER";
+ case Types.BIGINT:
+ return "BIGINT";
+ case Types.FLOAT:
+ return "FLOAT";
+ case Types.REAL:
+ return "REAL";
+ case Types.DOUBLE:
+ return "DOUBLE";
+ case Types.NUMERIC:
+ return "NUMERIC";
+ case Types.DECIMAL:
+ return "DECIMAL";
+ case Types.CHAR:
+ return "CHAR";
+ case Types.VARCHAR:
+ return "VARCHAR";
+ case Types.LONGVARCHAR:
+ return "LONGVARCHAR";
+ case Types.DATE:
+ return "DATE";
+ case Types.TIME:
+ return "TIME";
+ case Types.TIMESTAMP:
+ return "TIMESTAMP";
+ case Types.BINARY:
+ return "BINARY";
+ case Types.VARBINARY:
+ return "VARBINARY";
+ case Types.LONGVARBINARY:
+ return "LONGVARBINARY";
+ case Types.NULL:
+ return "NULL";
+ case Types.OTHER:
+ return "OTHER";
+ case Types.JAVA_OBJECT:
+ return "JAVA_OBJECT";
+ case Types.DISTINCT:
+ return "DISTINCT";
+ case Types.STRUCT:
+ return "STRUCT";
+ case Types.ARRAY:
+ return "ARRAY";
+ case Types.BLOB:
+ return "BLOB";
+ case Types.CLOB:
+ return "CLOB";
+ case Types.REF:
+ return "REF";
+ case Types.DATALINK:
+ return "DATALINK";
+ case Types.BOOLEAN:
+ return "BOOLEAN";
+ case Types.ROWID:
+ return "ROWID";
+ case Types.NCHAR:
+ return "NCHAR";
+ case Types.NVARCHAR:
+ return "NVARCHAR";
+ case Types.LONGNVARCHAR:
+ return "LONGNVARCHAR";
+ case Types.NCLOB:
+ return "NCLOB";
+ case Types.SQLXML:
+ return "SQLXML";
+ default:
+ return "<UNKNOWN>";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 42f521f..01a55e5 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -108,6 +108,13 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
public static final String HIVE_PARTITION_VALUE_ARG = "hive-partition-value";
public static final String CREATE_HIVE_TABLE_ARG =
"create-hive-table";
+ public static final String HCATALOG_TABLE_ARG = "hcatalog-table";
+ public static final String HCATALOG_DATABASE_ARG = "hcatalog-database";
+ public static final String CREATE_HCATALOG_TABLE_ARG =
+ "create-hcatalog-table";
+ public static final String HCATALOG_STORAGE_STANZA_ARG =
+ "hcatalog-storage-stanza";
+ public static final String HCATALOG_HOME_ARG = "hcatalog-home";
public static final String MAPREDUCE_JOB_NAME = "mapreduce-job-name";
public static final String NUM_MAPPERS_ARG = "num-mappers";
public static final String NUM_MAPPERS_SHORT_ARG = "m";
@@ -488,6 +495,66 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
return hiveOpts;
}
+ /**
+ * @return options governing interaction with HCatalog.
+ */
+ protected RelatedOptions getHCatalogOptions() {
+ RelatedOptions hCatOptions = new RelatedOptions("HCatalog arguments");
+ hCatOptions.addOption(OptionBuilder
+ .hasArg()
+ .withDescription("HCatalog table name")
+ .withLongOpt(HCATALOG_TABLE_ARG)
+ .create());
+ hCatOptions.addOption(OptionBuilder
+ .hasArg()
+ .withDescription("HCatalog database name")
+ .withLongOpt(HCATALOG_DATABASE_ARG)
+ .create());
+
+ hCatOptions.addOption(OptionBuilder.withArgName("dir")
+ .hasArg().withDescription("Override $HIVE_HOME")
+ .withLongOpt(HIVE_HOME_ARG)
+ .create());
+ hCatOptions.addOption(OptionBuilder.withArgName("hdir")
+ .hasArg().withDescription("Override $HCAT_HOME")
+ .withLongOpt(HCATALOG_HOME_ARG)
+ .create());
+ hCatOptions.addOption(OptionBuilder.withArgName("partition-key")
+ .hasArg()
+ .withDescription("Sets the partition key to use when importing to hive")
+ .withLongOpt(HIVE_PARTITION_KEY_ARG)
+ .create());
+ hCatOptions.addOption(OptionBuilder.withArgName("partition-value")
+ .hasArg()
+ .withDescription("Sets the partition value to use when importing "
+ + "to hive")
+ .withLongOpt(HIVE_PARTITION_VALUE_ARG)
+ .create());
+ hCatOptions.addOption(OptionBuilder
+ .hasArg()
+ .withDescription("Override mapping for specific column to hive"
+ + " types.")
+ .withLongOpt(MAP_COLUMN_HIVE)
+ .create());
+
+ return hCatOptions;
+ }
+
+ protected RelatedOptions getHCatImportOnlyOptions() {
+ RelatedOptions hCatOptions = new RelatedOptions(
+ "HCatalog import specific options");
+ hCatOptions.addOption(OptionBuilder
+ .withDescription("Create HCatalog before import")
+ .withLongOpt(CREATE_HCATALOG_TABLE_ARG)
+ .create());
+ hCatOptions.addOption(OptionBuilder
+ .hasArg()
+ .withDescription("HCatalog storage stanza for table creation")
+ .withLongOpt(HCATALOG_STORAGE_STANZA_ARG)
+ .create());
+ return hCatOptions;
+ }
+
/**
* @return options governing output format delimiters
*/
@@ -826,7 +893,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
out.setHiveTableName(in.getOptionValue(HIVE_TABLE_ARG));
}
- if(in.hasOption(HIVE_DATABASE_ARG)) {
+ if (in.hasOption(HIVE_DATABASE_ARG)) {
out.setHiveDatabaseName(in.getOptionValue(HIVE_DATABASE_ARG));
}
@@ -852,38 +919,79 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
}
}
+ protected void applyHCatOptions(CommandLine in, SqoopOptions out) {
+ if (in.hasOption(HCATALOG_TABLE_ARG)) {
+ out.setHCatTableName(in.getOptionValue(HCATALOG_TABLE_ARG));
+ }
+
+ if (in.hasOption(HCATALOG_DATABASE_ARG)) {
+ out.setHCatDatabaseName(in.getOptionValue(HCATALOG_DATABASE_ARG));
+ }
+
+ if (in.hasOption(HCATALOG_STORAGE_STANZA_ARG)) {
+ out.setHCatStorageStanza(in.getOptionValue(HCATALOG_STORAGE_STANZA_ARG));
+ }
+
+ if (in.hasOption(CREATE_HCATALOG_TABLE_ARG)) {
+ out.setCreateHCatalogTable(true);
+ }
+
+ if (in.hasOption(HCATALOG_HOME_ARG)) {
+ out.setHCatHome(in.getOptionValue(HCATALOG_HOME_ARG));
+ }
+
+ // Allow some of the hive options also
+
+ if (in.hasOption(HIVE_HOME_ARG)) {
+ out.setHiveHome(in.getOptionValue(HIVE_HOME_ARG));
+ }
+
+ if (in.hasOption(HIVE_PARTITION_KEY_ARG)) {
+ out.setHivePartitionKey(in.getOptionValue(HIVE_PARTITION_KEY_ARG));
+ }
+
+ if (in.hasOption(HIVE_PARTITION_VALUE_ARG)) {
+ out.setHivePartitionValue(in.getOptionValue(HIVE_PARTITION_VALUE_ARG));
+ }
+
+ if (in.hasOption(MAP_COLUMN_HIVE)) {
+ out.setMapColumnHive(in.getOptionValue(MAP_COLUMN_HIVE));
+ }
+ }
+
+
protected void applyOutputFormatOptions(CommandLine in, SqoopOptions out)
throws InvalidOptionsException {
if (in.hasOption(FIELDS_TERMINATED_BY_ARG)) {
out.setFieldsTerminatedBy(SqoopOptions.toChar(
in.getOptionValue(FIELDS_TERMINATED_BY_ARG)));
- out.setExplicitDelims(true);
+ out.setExplicitOutputDelims(true);
}
if (in.hasOption(LINES_TERMINATED_BY_ARG)) {
out.setLinesTerminatedBy(SqoopOptions.toChar(
in.getOptionValue(LINES_TERMINATED_BY_ARG)));
- out.setExplicitDelims(true);
+ out.setExplicitOutputDelims(true);
}
if (in.hasOption(OPTIONALLY_ENCLOSED_BY_ARG)) {
out.setEnclosedBy(SqoopOptions.toChar(
in.getOptionValue(OPTIONALLY_ENCLOSED_BY_ARG)));
out.setOutputEncloseRequired(false);
- out.setExplicitDelims(true);
+ out.setExplicitOutputDelims(true);
}
if (in.hasOption(ENCLOSED_BY_ARG)) {
out.setEnclosedBy(SqoopOptions.toChar(
in.getOptionValue(ENCLOSED_BY_ARG)));
out.setOutputEncloseRequired(true);
- out.setExplicitDelims(true);
+ out.setExplicitOutputDelims(true);
}
if (in.hasOption(ESCAPED_BY_ARG)) {
out.setEscapedBy(SqoopOptions.toChar(
in.getOptionValue(ESCAPED_BY_ARG)));
- out.setExplicitDelims(true);
+ out.setExplicitOutputDelims(true);
}
if (in.hasOption(MYSQL_DELIMITERS_ARG)) {
@@ -892,7 +1000,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
out.setLinesTerminatedBy('\n');
out.setEscapedBy('\\');
out.setEnclosedBy('\'');
- out.setExplicitDelims(true);
+ out.setExplicitOutputDelims(true);
}
}
@@ -901,28 +1009,33 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
if (in.hasOption(INPUT_FIELDS_TERMINATED_BY_ARG)) {
out.setInputFieldsTerminatedBy(SqoopOptions.toChar(
in.getOptionValue(INPUT_FIELDS_TERMINATED_BY_ARG)));
+ out.setExplicitInputDelims(true);
}
if (in.hasOption(INPUT_LINES_TERMINATED_BY_ARG)) {
out.setInputLinesTerminatedBy(SqoopOptions.toChar(
in.getOptionValue(INPUT_LINES_TERMINATED_BY_ARG)));
+ out.setExplicitInputDelims(true);
}
if (in.hasOption(INPUT_OPTIONALLY_ENCLOSED_BY_ARG)) {
out.setInputEnclosedBy(SqoopOptions.toChar(
in.getOptionValue(INPUT_OPTIONALLY_ENCLOSED_BY_ARG)));
out.setInputEncloseRequired(false);
+ out.setExplicitInputDelims(true);
}
if (in.hasOption(INPUT_ENCLOSED_BY_ARG)) {
out.setInputEnclosedBy(SqoopOptions.toChar(
in.getOptionValue(INPUT_ENCLOSED_BY_ARG)));
out.setInputEncloseRequired(true);
+ out.setExplicitInputDelims(true);
}
if (in.hasOption(INPUT_ESCAPED_BY_ARG)) {
out.setInputEscapedBy(SqoopOptions.toChar(
in.getOptionValue(INPUT_ESCAPED_BY_ARG)));
+ out.setExplicitInputDelims(true);
}
}
@@ -1021,7 +1134,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
protected void validateOutputFormatOptions(SqoopOptions options)
throws InvalidOptionsException {
if (options.doHiveImport()) {
- if (!options.explicitDelims()) {
+ if (!options.explicitOutputDelims()) {
// user hasn't manually specified delimiters, and wants to import
// straight to Hive. Use Hive-style delimiters.
LOG.info("Using Hive-specific delimiters for output. You can override");
@@ -1050,6 +1163,14 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
+ " option." + HELP_STR);
}
+ // Make sure that one of hCatalog or hive jobs are used
+ String hCatTable = options.getHCatTableName();
+ if (hCatTable != null && options.doHiveImport()) {
+ throw new InvalidOptionsException("The " + HCATALOG_TABLE_ARG
+ + " option conflicts with the " + HIVE_IMPORT_ARG
+ + " option." + HELP_STR);
+ }
+
if(options.doHiveImport()
&& options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
throw new InvalidOptionsException("Hive import is not compatible with "
@@ -1083,16 +1204,19 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
}
// Warn about using hive specific arguments without hive import itself
+ // In HCatalog support some of the Hive options are reused
if (!options.doHiveImport()
&& ((options.getHiveHome() != null
- && !options.getHiveHome().equals(SqoopOptions.getHiveHomeDefault()))
+ && !options.getHiveHome().
+ equals(SqoopOptions.getHiveHomeDefault())
+ && hCatTable == null))
|| options.doOverwriteHiveTable()
|| options.doFailIfHiveTableExists()
|| (options.getHiveTableName() != null
&& !options.getHiveTableName().equals(options.getTableName()))
- || options.getHivePartitionKey() != null
- || options.getHivePartitionValue() != null
- || options.getMapColumnHive().size() > 0)) {
+ || (options.getHivePartitionKey() != null && hCatTable == null)
+ || (options.getHivePartitionValue() != null && hCatTable == null)
+ || (options.getMapColumnHive().size() > 0 && hCatTable == null)) {
LOG.warn("It seems that you've specified at least one of following:");
LOG.warn("\t--hive-home");
LOG.warn("\t--hive-overwrite");
@@ -1105,6 +1229,89 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
LOG.warn("those arguments will not be used in this session. Either");
LOG.warn("specify --hive-import to apply them correctly or remove them");
LOG.warn("from command line to remove this warning.");
+ LOG.info("Please note that --hive-home, --hive-partition-key, ");
+ LOG.info("\t hive-partition-value and --map-column-hive options are ");
+ LOG.info("\t are also valid for HCatalog imports and exports");
+ }
+ }
+
+ protected void validateHCatalogOptions(SqoopOptions options)
+ throws InvalidOptionsException {
+ // Make sure that one of hCatalog or hive jobs are used
+ String hCatTable = options.getHCatTableName();
+ if (hCatTable == null) {
+ if (options.getHCatHome() != null && !options.getHCatHome().
+ equals(SqoopOptions.getHCatHomeDefault())) {
+ LOG.warn("--hcatalog-home option will be ignored in "
+ + "non-HCatalog jobs");
+ }
+ if (options.getHCatDatabaseName() != null) {
+ LOG.warn("--hcatalog-database option will be ignored "
+ + "without --hcatalog-table");
+ }
+
+ if (options.getHCatStorageStanza() != null) {
+ LOG.warn("--hcatalog-storage-stanza option will be ignored "
+ + "without --hatalog-table");
+ }
+ return;
+ }
+
+ if (options.explicitInputDelims()) {
+ LOG.warn("Input field/record delimiter options are not "
+ + "used in HCatalog jobs unless the format is text. It is better "
+ + "to use --hive-import in those cases. For text formats");
+ }
+ if (options.explicitOutputDelims()
+ || options.getHiveDelimsReplacement() != null
+ || options.doHiveDropDelims()) {
+ LOG.warn("Output field/record delimiter options are not useful"
+ + " in HCatalog jobs for most of the output types except text based "
+ + " formats is text. It is better "
+ + "to use --hive-import in those cases. For non text formats, ");
+ }
+ if (options.doHiveImport()) {
+ throw new InvalidOptionsException("The " + HCATALOG_TABLE_ARG
+ + " option conflicts with the " + HIVE_IMPORT_ARG
+ + " option." + HELP_STR);
+ }
+ if (options.getTargetDir() != null) {
+ throw new InvalidOptionsException("The " + TARGET_DIR_ARG
+ + " option conflicts with the " + HCATALOG_TABLE_ARG
+ + " option." + HELP_STR);
+ }
+ if (options.getWarehouseDir() != null) {
+ throw new InvalidOptionsException("The " + WAREHOUSE_DIR_ARG
+ + " option conflicts with the " + HCATALOG_TABLE_ARG
+ + " option." + HELP_STR);
+ }
+ if (options.isDirect()) {
+ throw new InvalidOptionsException("Direct import is incompatible with "
+ + "HCatalog. Please remove the parameter --direct");
+ }
+ if (options.isAppendMode()) {
+ throw new InvalidOptionsException("Append mode for imports is not "
+ + " compatible with HCatalog. Please remove the parameter"
+ + "--append-mode");
+ }
+ if (options.getExportDir() != null) {
+ throw new InvalidOptionsException("The " + EXPORT_PATH_ARG
+ + " option conflicts with the " + HCATALOG_TABLE_ARG
+ + " option." + HELP_STR);
+ }
+
+ if (options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
+ throw new InvalidOptionsException("HCatalog job is not compatible with "
+ + " AVRO format option " + FMT_AVRODATAFILE_ARG
+ + " option." + HELP_STR);
+
+ }
+
+ if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
+ throw new InvalidOptionsException("HCatalog job is not compatible with "
+ + "SequenceFile format option " + FMT_SEQUENCEFILE_ARG
+ + " option." + HELP_STR);
+
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/tool/CodeGenTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/CodeGenTool.java b/src/java/org/apache/sqoop/tool/CodeGenTool.java
index dd34a97..c1ea881 100644
--- a/src/java/org/apache/sqoop/tool/CodeGenTool.java
+++ b/src/java/org/apache/sqoop/tool/CodeGenTool.java
@@ -160,6 +160,7 @@ public class CodeGenTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
toolOptions.addUniqueOptions(getOutputFormatOptions());
toolOptions.addUniqueOptions(getInputFormatOptions());
toolOptions.addUniqueOptions(getHiveOptions(true));
+ toolOptions.addUniqueOptions(getHCatalogOptions());
}
@Override
@@ -188,6 +189,7 @@ public class CodeGenTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
applyInputFormatOptions(in, out);
applyCodeGenOptions(in, out, false);
applyHiveOptions(in, out);
+ applyHCatOptions(in, out);
}
@Override
@@ -203,6 +205,7 @@ public class CodeGenTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
validateCodeGenOptions(options);
validateOutputFormatOptions(options);
validateHiveOptions(options);
+ validateHCatalogOptions(options);
if (options.getTableName() == null
&& options.getSqlQuery() == null) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/tool/ExportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ExportTool.java b/src/java/org/apache/sqoop/tool/ExportTool.java
index 215addd..4c7d00c 100644
--- a/src/java/org/apache/sqoop/tool/ExportTool.java
+++ b/src/java/org/apache/sqoop/tool/ExportTool.java
@@ -215,6 +215,7 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
.create());
toolOptions.addUniqueOptions(codeGenOpts);
+ toolOptions.addUniqueOptions(getHCatalogOptions());
}
@Override
@@ -291,6 +292,7 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
applyInputFormatOptions(in, out);
applyOutputFormatOptions(in, out);
applyCodeGenOptions(in, out, false);
+ applyHCatOptions(in, out);
} catch (NumberFormatException nfe) {
throw new InvalidOptionsException("Error: expected numeric argument.\n"
+ "Try --help for usage.");
@@ -307,9 +309,11 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
throw new InvalidOptionsException(
"Export requires a --table or a --call argument."
+ HELP_STR);
- } else if (options.getExportDir() == null) {
+ } else if (options.getExportDir() == null
+ && options.getHCatTableName() == null) {
throw new InvalidOptionsException(
- "Export requires an --export-dir argument."
+ "Export requires an --export-dir argument or "
+ + "--hcatalog-table argument."
+ HELP_STR);
} else if (options.getExistingJarName() != null
&& options.getClassName() == null) {
@@ -382,6 +386,7 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
validateOutputFormatOptions(options);
validateCommonOptions(options);
validateCodeGenOptions(options);
+ validateHCatalogOptions(options);
}
private void applyNewUpdateOptions(CommandLine in, SqoopOptions out)
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 2627726..424d9ec 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -653,6 +653,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
toolOptions.addUniqueOptions(getInputFormatOptions());
toolOptions.addUniqueOptions(getHiveOptions(true));
toolOptions.addUniqueOptions(getHBaseOptions());
+ toolOptions.addUniqueOptions(getHCatalogOptions());
+ toolOptions.addUniqueOptions(getHCatImportOnlyOptions());
// get common codegen opts.
RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
@@ -676,7 +678,7 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
System.out.println("At minimum, you must specify --connect");
} else {
System.out.println(
- "At minimum, you must specify --connect and --table");
+ "At minimum, you must specify --connect and --table");
}
System.out.println(
@@ -819,6 +821,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
applyInputFormatOptions(in, out);
applyCodeGenOptions(in, out, allTables);
applyHBaseOptions(in, out);
+ applyHCatOptions(in, out);
+
} catch (NumberFormatException nfe) {
throw new InvalidOptionsException("Error: expected numeric argument.\n"
+ "Try --help for usage.");
@@ -892,7 +896,12 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
!= SqoopOptions.IncrementalMode.None && options.isValidationEnabled()) {
throw new InvalidOptionsException("Validation is not supported for "
+ "incremental imports but single table only.");
- }
+ } else if ((options.getTargetDir() != null
+ || options.getWarehouseDir() != null)
+ && options.getHCatTableName() != null) {
+ throw new InvalidOptionsException("--hcatalog-table cannot be used "
+ + " --warehouse-dir or --target-dir options");
+ }
}
/**
@@ -936,6 +945,7 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
validateOutputFormatOptions(options);
validateHBaseOptions(options);
validateHiveOptions(options);
+ validateHCatalogOptions(options);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/perftest/ExportStressTest.java
----------------------------------------------------------------------
diff --git a/src/perftest/ExportStressTest.java b/src/perftest/ExportStressTest.java
index 0a41408..b5710e0 100644
--- a/src/perftest/ExportStressTest.java
+++ b/src/perftest/ExportStressTest.java
@@ -117,7 +117,7 @@ public class ExportStressTest extends Configured implements Tool {
options.setNumMappers(4);
options.setLinesTerminatedBy('\n');
options.setFieldsTerminatedBy(',');
- options.setExplicitDelims(true);
+ options.setExplicitOutputDelims(true);
SqoopTool exportTool = new ExportTool();
Sqoop sqoop = new Sqoop(exportTool, getConf(), options);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
index 06f7122..7e361d2 100644
--- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java
+++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
@@ -18,6 +18,9 @@
package com.cloudera.sqoop;
+import org.apache.sqoop.hcat.HCatalogExportTest;
+import org.apache.sqoop.hcat.HCatalogImportTest;
+
import com.cloudera.sqoop.hbase.HBaseImportTest;
import com.cloudera.sqoop.hbase.HBaseQueryImportTest;
import com.cloudera.sqoop.hbase.HBaseUtilTest;
@@ -71,6 +74,10 @@ public final class ThirdPartyTests extends TestCase {
suite.addTestSuite(HBaseQueryImportTest.class);
suite.addTestSuite(HBaseUtilTest.class);
+ // HCatalog
+ suite.addTestSuite(HCatalogImportTest.class);
+ suite.addTestSuite(HCatalogExportTest.class);
+
return suite;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
index 462ccf1..9c47bad 100644
--- a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
+++ b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
@@ -51,6 +51,16 @@ public class TestHiveImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestHiveImport.class.getName());
+ public void setUp() {
+ super.setUp();
+ HiveImport.setTestMode(true);
+ }
+
+ public void tearDown() {
+ super.tearDown();
+ HiveImport.setTestMode(false);
+ }
+
/**
* Sets the expected number of columns in the table being manipulated
* by the test. Under the hood, this sets the expected column names
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index cf41b96..d6afbc8 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -413,7 +413,7 @@ public abstract class BaseSqoopTestCase extends TestCase {
protected void removeTableDir() {
File tableDirFile = new File(getTablePath().toString());
if (tableDirFile.exists()) {
- // Remove the director where the table will be imported to,
+ // Remove the directory where the table will be imported to,
// prior to running the MapReduce job.
if (!DirUtil.deleteDir(tableDirFile)) {
LOG.warn("Could not delete table directory: "
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
index e13f3df..4421f0c 100644
--- a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
@@ -26,7 +26,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
-import com.cloudera.sqoop.SqoopOptions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,6 +33,7 @@ import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import com.cloudera.sqoop.Sqoop;
+import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
import com.cloudera.sqoop.tool.ExportTool;
@@ -113,7 +113,7 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
}
}
}
-
+ boolean isHCatJob = false;
// The sqoop-specific additional args are then added.
if (null != additionalArgv) {
boolean prevIsFlag = false;
@@ -126,6 +126,9 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
continue;
} else {
// normal argument.
+ if (!isHCatJob && arg.equals("--hcatalog-table")) {
+ isHCatJob = true;
+ }
args.add(arg);
}
}
@@ -135,8 +138,11 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
args.add("--table");
args.add(getTableName());
}
- args.add("--export-dir");
- args.add(getTablePath().toString());
+ // Only add export-dir if hcatalog-table is not there in additional argv
+ if (!isHCatJob) {
+ args.add("--export-dir");
+ args.add(getTablePath().toString());
+ }
args.add("--connect");
args.add(getConnectString());
args.add("--fields-terminated-by");