You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/03/14 22:34:50 UTC
falcon git commit: FALCON-1836 Import from database to HCatalog
Repository: falcon
Updated Branches:
refs/heads/master 9ec4c23a1 -> aec6084e7
FALCON-1836 Import from database to HCatalog
Author: Venkatesan Ramachandran <vr...@hortonworks.com>
Reviewers: "Balu Vellanki <bv...@hortonworks.com>, Ajay Yadava <aj...@apache.org>, Peeyush Bishnoi <bp...@yahoo.co.in>"
Closes #61 from vramachan/master
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aec6084e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aec6084e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aec6084e
Branch: refs/heads/master
Commit: aec6084e7df82574c0b41063ab0f4a4115cbf25d
Parents: 9ec4c23
Author: Venkatesan Ramachandran <vr...@hortonworks.com>
Authored: Mon Mar 14 14:34:28 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Mon Mar 14 14:34:28 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/falcon/entity/HiveUtil.java | 4 +-
.../org/apache/falcon/entity/HiveUtilTest.java | 4 +-
.../engine/oozie/utils/OozieBuilderUtils.java | 4 +-
oozie/pom.xml | 19 +++
.../oozie/DatabaseExportWorkflowBuilder.java | 116 ++++++++++++++-----
.../oozie/DatabaseImportWorkflowBuilder.java | 115 ++++++++++++++----
.../falcon/oozie/ExportWorkflowBuilder.java | 5 +-
.../oozie/FeedExportCoordinatorBuilder.java | 14 +--
.../apache/falcon/oozie/ImportExportCommon.java | 46 +++++++-
.../falcon/oozie/ImportWorkflowBuilder.java | 5 +-
.../OozieOrchestrationWorkflowBuilder.java | 6 +-
.../java/org/apache/falcon/util/OozieUtils.java | 28 +++++
.../apache/falcon/lifecycle/FeedExportIT.java | 115 ++++++++++++++++++
.../apache/falcon/lifecycle/FeedImportIT.java | 56 ++++++++-
.../falcon/resource/AbstractTestContext.java | 3 +-
.../test/resources/feed-export-template6.xml | 56 +++++++++
webapp/src/test/resources/feed-template5.xml | 55 +++++++++
17 files changed, 575 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
index f4029e4..f8eaebb 100644
--- a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
@@ -29,7 +29,7 @@ import java.util.Properties;
*/
public final class HiveUtil {
public static final String METASTOREURIS = "hive.metastore.uris";
- public static final String METASTROE_URI = "hcat.metastore.uri";
+ public static final String METASTORE_URI = "hcat.metastore.uri";
public static final String NODE = "hcatNode";
public static final String METASTORE_UGI = "hive.metastore.execute.setugi";
@@ -48,7 +48,7 @@ public final class HiveUtil {
hiveCredentials.put(METASTOREURIS, metaStoreUrl);
hiveCredentials.put(METASTORE_UGI, "true");
hiveCredentials.put(NODE, metaStoreUrl.replace("thrift", "hcat"));
- hiveCredentials.put(METASTROE_URI, metaStoreUrl);
+ hiveCredentials.put(METASTORE_URI, metaStoreUrl);
if (SecurityUtil.isSecurityEnabled()) {
String principal = ClusterHelper
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
index c37cebd..7f890f3 100644
--- a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
@@ -55,7 +55,7 @@ public class HiveUtilTest {
Properties expected = new Properties();
expected.put(HiveUtil.METASTORE_UGI, "true");
expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
- expected.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+ expected.put(HiveUtil.METASTORE_URI, metaStoreUrl);
expected.put(HiveUtil.METASTOREURIS, metaStoreUrl);
Properties actual = HiveUtil.getHiveCredentials(cluster);
@@ -91,7 +91,7 @@ public class HiveUtilTest {
expected.put(SecurityUtil.METASTORE_PRINCIPAL, principal);
expected.put(HiveUtil.METASTORE_UGI, "true");
expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
- expected.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+ expected.put(HiveUtil.METASTORE_URI, metaStoreUrl);
expected.put(HiveUtil.METASTOREURIS, metaStoreUrl);
Properties actual = HiveUtil.getHiveCredentials(cluster);
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
index 732a9e7..8f1b53b 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
@@ -417,7 +417,7 @@ public final class OozieBuilderUtils {
credential.setName(credentialName);
credential.setType("hcat");
- credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl));
+ credential.getProperty().add(createProperty(HiveUtil.METASTORE_URI, metaStoreUrl));
credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL,
ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)));
@@ -441,7 +441,7 @@ public final class OozieBuilderUtils {
hiveCredentials.put(HiveUtil.METASTOREURIS, metaStoreUrl);
hiveCredentials.put(HiveUtil.METASTORE_UGI, "true");
hiveCredentials.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
- hiveCredentials.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+ hiveCredentials.put(HiveUtil.METASTORE_URI, metaStoreUrl);
if (SecurityUtil.isSecurityEnabled()) {
String principal = ClusterHelper
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 4623d8b..c53d33c 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -186,6 +186,25 @@
</configuration>
</execution>
<execution>
+ <id>sqoop-gen</id>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+ <forceRegenerate>true</forceRegenerate>
+ <generatePackage>org.apache.falcon.oozie.sqoop</generatePackage>
+ <schemas>
+ <schema>
+ <dependencyResource>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-client</artifactId>
+ <resource>sqoop-action-0.3.xsd</resource>
+ </dependencyResource>
+ </schema>
+ </schemas>
+ </configuration>
+ </execution>
+ <execution>
<id>bundle-gen</id>
<goals>
<goal>generate</goal>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
index d69611b..284c4a3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
@@ -18,18 +18,25 @@
package org.apache.falcon.oozie;
+import com.google.common.base.Splitter;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.DatasourceHelper;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LoadMethod;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.fs.Path;
+import javax.xml.bind.JAXBElement;
+import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@@ -49,22 +56,27 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
}
@Override
- protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException {
+ protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+ throws FalconException {
- addLibExtensionsToWorkflow(cluster, workflow, Tag.EXPORT);
+ ACTION action = unmarshalAction(EXPORT_SQOOP_ACTION_TEMPLATE);
+ JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionJaxbElement = OozieUtils.unMarshalSqoopAction(action);
+ org.apache.falcon.oozie.sqoop.ACTION sqoopExport = actionJaxbElement.getValue();
+
+ Properties props = new Properties();
+ ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath);
+ sqoopExport.getJobXml().add("${wf:appPath()}/conf/hive-site.xml");
+ OozieUtils.marshalSqoopAction(action, actionJaxbElement);
- ACTION sqoopExport = unmarshalAction(EXPORT_SQOOP_ACTION_TEMPLATE);
- addTransition(sqoopExport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(sqoopExport);
+ addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
//Add post-processing actions
ACTION success = getSuccessPostProcessAction();
- // delete addHDFSServersConfig(success, src, target);
addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);
ACTION fail = getFailPostProcessAction();
- // delete addHDFSServersConfig(fail, src, target);
addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);
@@ -74,7 +86,6 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
// build the sqoop command and put it in the properties
String sqoopCmd = buildSqoopCommand(cluster, entity);
LOG.info("SQOOP EXPORT COMMAND : " + sqoopCmd);
- Properties props = new Properties();
props.put("sqoopCommand", sqoopCmd);
return props;
}
@@ -86,28 +97,21 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
buildConnectArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
buildTableArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
- ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, cluster, entity)
+ Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getExportDatasourceName(
+ FeedHelper.getCluster(entity, cluster.getName())));
+ ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, datasource)
.append(ImportExportCommon.ARG_SEPARATOR);
buildNumMappers(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
- buildArguments(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
+ buildArguments(sqoopArgs, extraArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR);
buildLoadType(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
- buildExportDirArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
+ buildExportArg(sqoopArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR);
- StringBuffer sqoopCmd = new StringBuffer();
+ StringBuilder sqoopCmd = new StringBuilder();
return sqoopCmd.append("export").append(ImportExportCommon.ARG_SEPARATOR)
.append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR)
.append(sqoopArgs).toString();
}
- private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException {
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
- Datasource db = DatasourceHelper.getDatasource(FeedHelper.getExportDatasourceName(feedCluster));
- if ((db.getDriver() != null) && (db.getDriver().getClazz() != null)) {
- builder.append("--driver").append(ImportExportCommon.ARG_SEPARATOR).append(db.getDriver().getClazz());
- }
- return builder;
- }
-
private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
return builder.append("--connect").append(ImportExportCommon.ARG_SEPARATOR)
@@ -132,18 +136,32 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
return builder.append(modeType);
}
+ private StringBuilder buildExportArg(StringBuilder builder, Feed feed, Cluster cluster)
+ throws FalconException {
+ Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster);
+ if (feedStorageType == Storage.TYPE.TABLE) {
+ return buildExportTableArg(builder, feed.getTable());
+ } else {
+ return buildExportDirArg(builder, cluster);
+ }
+ }
+
private StringBuilder buildExportDirArg(StringBuilder builder, Cluster cluster)
throws FalconException {
return builder.append("--export-dir").append(ImportExportCommon.ARG_SEPARATOR)
- .append(String.format("${coord:dataIn('%s')}",
- FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME));
+ .append(String.format("${coord:dataIn('%s')}",
+ FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME));
}
- private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs)
- throws FalconException {
+ private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs, Feed feed,
+ Cluster cluster) throws FalconException {
+ Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster);
for(Map.Entry<String, String> e : extraArgs.entrySet()) {
+ if ((feedStorageType == Storage.TYPE.TABLE) && (e.getKey().equals("--update-key"))) {
+ continue;
+ }
builder.append(e.getKey()).append(ImportExportCommon.ARG_SEPARATOR).append(e.getValue())
- .append(ImportExportCommon.ARG_SEPARATOR);
+ .append(ImportExportCommon.ARG_SEPARATOR);
}
return builder;
}
@@ -169,4 +187,50 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
return FeedHelper.getExportArguments(feedCluster);
}
+
+ private StringBuilder buildExportTableArg(StringBuilder builder, CatalogTable catalog) throws FalconException {
+
+ LOG.info("Catalog URI {}", catalog.getUri());
+ builder.append("--skip-dist-cache").append(ImportExportCommon.ARG_SEPARATOR);
+ Iterator<String> itr = Splitter.on("#").split(catalog.getUri()).iterator();
+ String dbTable = itr.next();
+ String partitions = itr.next();
+ Iterator<String> itrDbTable = Splitter.on(":").split(dbTable).iterator();
+ itrDbTable.next();
+ String db = itrDbTable.next();
+ String table = itrDbTable.next();
+ LOG.debug("Target database {}, table {}", db, table);
+ builder.append("--hcatalog-database").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(String.format("${coord:databaseIn('%s')}", FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME))
+ .append(ImportExportCommon.ARG_SEPARATOR);
+
+ builder.append("--hcatalog-table").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(String.format("${coord:tableIn('%s')}", FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME))
+ .append(ImportExportCommon.ARG_SEPARATOR);
+
+ Map<String, String> partitionsMap = ImportExportCommon.getPartitionKeyValues(partitions);
+ if (partitionsMap.size() > 0) {
+ StringBuilder partitionKeys = new StringBuilder();
+ StringBuilder partitionValues = new StringBuilder();
+ for (Map.Entry<String, String> e : partitionsMap.entrySet()) {
+ partitionKeys.append(e.getKey());
+ partitionKeys.append(',');
+ partitionValues.append(String.format("${coord:dataInPartitionMin('%s','%s')}",
+ FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME,
+ e.getKey()));
+ partitionValues.append(',');
+ }
+ if (partitionsMap.size() > 0) {
+ partitionKeys.setLength(partitionKeys.length()-1);
+ partitionValues.setLength(partitionValues.length()-1);
+ }
+ LOG.debug("partitionKeys {} and partitionValue {}", partitionKeys.toString(), partitionValues.toString());
+ builder.append("--hcatalog-partition-keys").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(partitionKeys.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+ builder.append("--hcatalog-partition-values").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(partitionValues.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+ }
+ return builder;
+ }
}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
index 66bfa9b..3e24428 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
@@ -18,25 +18,35 @@
package org.apache.falcon.oozie;
+import com.google.common.base.Splitter;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.DatasourceHelper;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.workflow.WorkflowExecutionContext;
+import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBElement;
+
/**
* Builds Datasource import workflow for Oozie.
*/
public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
+
protected static final String IMPORT_SQOOP_ACTION_TEMPLATE = "/action/feed/import-sqoop-database-action.xml";
protected static final String IMPORT_ACTION_NAME="db-import-sqoop";
@@ -48,23 +58,27 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
}
@Override
- protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException {
+ protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+ throws FalconException {
- addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
+ ACTION action = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE);
+ JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionJaxbElement = OozieUtils.unMarshalSqoopAction(action);
+ org.apache.falcon.oozie.sqoop.ACTION sqoopImport = actionJaxbElement.getValue();
+
+ Properties props = new Properties();
+ ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath);
+ sqoopImport.getJobXml().add("${wf:appPath()}/conf/hive-site.xml");
+ OozieUtils.marshalSqoopAction(action, actionJaxbElement);
- ACTION sqoopImport = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE);
- // delete addHDFSServersConfig(sqoopImport, src, target);
- addTransition(sqoopImport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(sqoopImport);
+ addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
//Add post-processing actions
ACTION success = getSuccessPostProcessAction();
- // delete addHDFSServersConfig(success, src, target);
addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);
ACTION fail = getFailPostProcessAction();
- // delete addHDFSServersConfig(fail, src, target);
addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);
@@ -73,8 +87,7 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
// build the sqoop command and put it in the properties
String sqoopCmd = buildSqoopCommand(cluster, entity);
- LOG.info("SQOOP COMMAND : " + sqoopCmd);
- Properties props = new Properties();
+ LOG.info("SQOOP IMPORT COMMAND : " + sqoopCmd);
props.put("sqoopCommand", sqoopCmd);
return props;
}
@@ -86,16 +99,18 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
buildDriverArgs(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
buildConnectArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
buildTableArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
- ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, cluster, entity)
- .append(ImportExportCommon.ARG_SEPARATOR);
+ Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(
+ FeedHelper.getCluster(entity, cluster.getName())));
+ ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, datasource)
+ .append(ImportExportCommon.ARG_SEPARATOR);
buildNumMappers(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
buildArguments(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
- buildTargetDirArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
+ buildTargetArg(sqoopArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR);
- StringBuffer sqoopCmd = new StringBuffer();
+ StringBuilder sqoopCmd = new StringBuilder();
return sqoopCmd.append("import").append(ImportExportCommon.ARG_SEPARATOR)
- .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR)
- .append(sqoopArgs).toString();
+ .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR)
+ .append(sqoopArgs).toString();
}
private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException {
@@ -110,29 +125,40 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
return builder.append("--connect").append(ImportExportCommon.ARG_SEPARATOR)
- .append(DatasourceHelper.getReadOnlyEndpoint(
- DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster))));
+ .append(DatasourceHelper.getReadOnlyEndpoint(
+ DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster))));
}
private StringBuilder buildTableArg(StringBuilder builder, Cluster cluster) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
return builder.append("--table").append(ImportExportCommon.ARG_SEPARATOR)
- .append(FeedHelper.getImportDataSourceTableName(feedCluster));
+ .append(FeedHelper.getImportDataSourceTableName(feedCluster));
}
- private StringBuilder buildTargetDirArg(StringBuilder builder, Cluster cluster)
+ private StringBuilder buildTargetArg(StringBuilder builder, Feed feed, Cluster cluster)
+ throws FalconException {
+ Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster);
+ if (feedStorageType == Storage.TYPE.TABLE) {
+ return buildTargetTableArg(builder, feed.getTable());
+
+ } else {
+ return buildTargetDirArg(builder);
+ }
+ }
+
+ private StringBuilder buildTargetDirArg(StringBuilder builder)
throws FalconException {
return builder.append("--delete-target-dir").append(ImportExportCommon.ARG_SEPARATOR)
.append("--target-dir").append(ImportExportCommon.ARG_SEPARATOR)
.append(String.format("${coord:dataOut('%s')}",
- FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
+ FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
}
private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs)
throws FalconException {
for(Map.Entry<String, String> e : extraArgs.entrySet()) {
builder.append(e.getKey()).append(ImportExportCommon.ARG_SEPARATOR).append(e.getValue())
- .append(ImportExportCommon.ARG_SEPARATOR);
+ .append(ImportExportCommon.ARG_SEPARATOR);
}
return builder;
}
@@ -158,4 +184,49 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
return FeedHelper.getImportArguments(feedCluster);
}
+
+ private StringBuilder buildTargetTableArg(StringBuilder builder, CatalogTable catalog) throws FalconException {
+
+ LOG.info("Catalog URI {}", catalog.getUri());
+ builder.append("--skip-dist-cache").append(ImportExportCommon.ARG_SEPARATOR);
+ Iterator<String> itr = Splitter.on("#").split(catalog.getUri()).iterator();
+ String dbTable = itr.next();
+ String partitions = itr.next();
+ Iterator<String> itrDbTable = Splitter.on(":").split(dbTable).iterator();
+ itrDbTable.next();
+ String db = itrDbTable.next();
+ String table = itrDbTable.next();
+ LOG.debug("Target database {}, table {}", db, table);
+ builder.append("--hcatalog-database").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(String.format("${coord:databaseOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME))
+ .append(ImportExportCommon.ARG_SEPARATOR);
+
+ builder.append("--hcatalog-table").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(String.format("${coord:tableOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME))
+ .append(ImportExportCommon.ARG_SEPARATOR);
+
+ Map<String, String> partitionsMap = ImportExportCommon.getPartitionKeyValues(partitions);
+ if (partitionsMap.size() > 0) {
+ StringBuilder partitionKeys = new StringBuilder();
+ StringBuilder partitionValues = new StringBuilder();
+ for (Map.Entry<String, String> e : partitionsMap.entrySet()) {
+ partitionKeys.append(e.getKey());
+ partitionKeys.append(',');
+ partitionValues.append(String.format("${coord:dataOutPartitionValue('%s','%s')}",
+ FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME,
+ e.getKey()));
+ partitionValues.append(',');
+ }
+ if (partitionsMap.size() > 0) {
+ partitionKeys.setLength(partitionKeys.length()-1);
+ partitionValues.setLength(partitionValues.length()-1);
+ }
+ LOG.debug("partitionKeys {} and partitionValue {}", partitionKeys.toString(), partitionValues.toString());
+ builder.append("--hcatalog-partition-keys").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(partitionKeys.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+ builder.append("--hcatalog-partition-values").append(ImportExportCommon.ARG_SEPARATOR)
+ .append(partitionValues.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+ }
+ return builder;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
index a55656c..af7431a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
@@ -47,7 +47,7 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu
WORKFLOWAPP workflow = new WORKFLOWAPP();
String wfName = EntityUtil.getWorkflowName(Tag.EXPORT, entity).toString();
workflow.setName(wfName);
- Properties p = getWorkflow(cluster, workflow);
+ Properties p = getWorkflow(cluster, workflow, buildPath);
marshal(cluster, workflow, buildPath);
Properties props = FeedHelper.getFeedProperties(entity);
@@ -81,5 +81,6 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu
return props;
}
- protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException;
+ protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+ throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
index 1bfacc2..4437d8b 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
@@ -53,7 +53,6 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
}
public static final String EXPORT_DATASET_NAME = "export-dataset";
-
public static final String EXPORT_DATAIN_NAME = "export-input";
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedExportCoordinatorBuilder.class);
@@ -63,19 +62,19 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
LOG.info("Generating Feed EXPORT coordinator.");
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster((Feed) entity, cluster.getName());
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
if (!FeedHelper.isExportEnabled(feedCluster)) {
return null;
}
- if (feedCluster.getValidity().getEnd().before(new Date())) {
+ if ((feedCluster.getValidity() != null) && (feedCluster.getValidity().getEnd().before(new Date()))) {
LOG.warn("Feed IMPORT is not applicable as Feed's end time for cluster {} is not in the future",
cluster.getName());
return null;
}
COORDINATORAPP coord = new COORDINATORAPP();
- initializeCoordAttributes(coord, (Feed) entity, cluster);
+ initializeCoordAttributes(coord, entity, cluster);
Properties props = createCoordDefaultConfiguration(getEntityName());
initializeInputPath(coord, cluster, props);
@@ -108,8 +107,8 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
coord.setInputEvents(new INPUTEVENTS());
}
- Storage storage = FeedHelper.createStorage(cluster, (Feed) entity);
- SYNCDATASET syncdataset = createDataSet((Feed) entity, cluster, storage,
+ Storage storage = FeedHelper.createStorage(cluster, entity);
+ SYNCDATASET syncdataset = createDataSet(entity, cluster, storage,
EXPORT_DATASET_NAME, LocationType.DATA);
if (syncdataset == null) {
@@ -126,6 +125,7 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
datain.setDataset(EXPORT_DATASET_NAME);
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+ datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
return datain;
}
@@ -138,7 +138,7 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
* @param storage
* @param datasetName
* @param locationType
- * @return
+ * @return Sync dataset
* @throws FalconException
*/
private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
index 19b567c..52c7820 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
@@ -18,15 +18,27 @@
package org.apache.falcon.oozie;
+import com.google.common.base.Splitter;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.DatasourceHelper;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.datasource.Credential;
import org.apache.falcon.entity.v0.datasource.Credentialtype;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
import java.net.URI;
import java.net.URISyntaxException;
@@ -38,13 +50,17 @@ public final class ImportExportCommon {
static final String ARG_SEPARATOR = " ";
+ public static final Logger LOG = LoggerFactory.getLogger(ImportExportCommon.class);
+
+ private static final Set<String> FALCON_IMPORT_SQOOP_ACTIONS = new HashSet<>(
+ Arrays.asList(new String[]{ OozieOrchestrationWorkflowBuilder.PREPROCESS_ACTION_NAME,
+ OozieOrchestrationWorkflowBuilder.USER_ACTION_NAME, }));
+
private ImportExportCommon() {
}
- static StringBuilder buildUserPasswordArg(StringBuilder builder, StringBuilder sqoopOpts,
- Cluster cluster, Feed entity) throws FalconException {
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
- Datasource db = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster));
+ static StringBuilder buildUserPasswordArg(StringBuilder builder, StringBuilder sqoopOpts, Datasource db)
+ throws FalconException {
Credential cred = DatasourceHelper.getReadPasswordInfo(db);
builder.append("--username").append(ARG_SEPARATOR)
.append(cred.getUserName())
@@ -70,4 +86,26 @@ public final class ImportExportCommon {
}
return builder;
}
+
+ public static void addHCatalogProperties(Properties props, Feed entity, Cluster cluster,
+ WORKFLOWAPP workflow, OozieOrchestrationWorkflowBuilder<Feed> wBuilder, Path buildPath)
+ throws FalconException {
+ if (FeedHelper.getStorageType(entity, cluster) == Storage.TYPE.TABLE) {
+ wBuilder.createHiveConfiguration(cluster, buildPath, "");
+ addHCatalogShareLibs(props);
+ if (SecurityUtil.isSecurityEnabled()) {
+ // add hcatalog credentials for secure mode and add a reference to each action
+ wBuilder.addHCatalogCredentials(workflow, cluster,
+ OozieOrchestrationWorkflowBuilder.HIVE_CREDENTIAL_NAME, FALCON_IMPORT_SQOOP_ACTIONS);
+ }
+ }
+ }
+ private static void addHCatalogShareLibs(Properties props) throws FalconException {
+ props.put("oozie.action.sharelib.for.sqoop", "sqoop,hive,hcatalog");
+ }
+
+ public static Map<String, String> getPartitionKeyValues(String partitionStr) {
+ return Splitter.on(";").withKeyValueSeparator("=").split(partitionStr);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
index cae8497..2d93189 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
@@ -48,7 +48,7 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu
WORKFLOWAPP workflow = new WORKFLOWAPP();
String wfName = EntityUtil.getWorkflowName(Tag.IMPORT, entity).toString();
workflow.setName(wfName);
- Properties p = getWorkflow(cluster, workflow);
+ Properties p = getWorkflow(cluster, workflow, buildPath);
marshal(cluster, workflow, buildPath);
Properties props = FeedHelper.getFeedProperties(entity);
@@ -81,5 +81,6 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu
return props;
}
- protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException;
+ protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+ throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index e137e11..181f2d2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -76,7 +76,7 @@ import java.util.Set;
* @param <T>
*/
public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
- protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+ public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
protected static final String USER_ACTION_NAME = "user-action";
protected static final String PREPROCESS_ACTION_NAME = "pre-processing";
@@ -329,7 +329,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
}
// creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
- protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
+ public void createHiveConfiguration(Cluster cluster, Path workflowPath,
String prefix) throws FalconException {
Configuration hiveConf = getHiveCredentialsAsConf(cluster);
@@ -413,7 +413,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
credential.setName(credentialName);
credential.setType("hcat");
- credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl));
+ credential.getProperty().add(createProperty(HiveUtil.METASTORE_URI, metaStoreUrl));
credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL,
ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)));
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 149a7e6..708788b 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -46,6 +46,7 @@ public final class OozieUtils {
public static final JAXBContext BUNDLE_JAXB_CONTEXT;
public static final JAXBContext CONFIG_JAXB_CONTEXT;
protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
+ protected static final JAXBContext SQOOP_ACTION_JAXB_CONTEXT;
static {
try {
@@ -56,6 +57,8 @@ public final class OozieUtils {
CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(CONFIGURATION.class);
HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
+ SQOOP_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
+ org.apache.falcon.oozie.sqoop.ACTION.class.getPackage().getName());
} catch (JAXBException e) {
throw new RuntimeException("Unable to create JAXB context", e);
}
@@ -97,4 +100,29 @@ public final class OozieUtils {
throw new RuntimeException("Unable to marshall hive action.", e);
}
}
+
+ @SuppressWarnings("unchecked")
+ public static JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> unMarshalSqoopAction(
+ org.apache.falcon.oozie.workflow.ACTION wfAction) {
+ try {
+ Unmarshaller unmarshaller = SQOOP_ACTION_JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+ return (JAXBElement<org.apache.falcon.oozie.sqoop.ACTION>)
+ unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to unmarshall sqoop action.", e);
+ }
+ }
+
+ public static void marshalSqoopAction(org.apache.falcon.oozie.workflow.ACTION wfAction,
+ JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionjaxbElement) {
+ try {
+ DOMResult hiveActionDOM = new DOMResult();
+ Marshaller marshaller = SQOOP_ACTION_JAXB_CONTEXT.createMarshaller();
+ marshaller.marshal(actionjaxbElement, hiveActionDOM);
+ wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to marshall sqoop action.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java
new file mode 100644
index 0000000..194f4c7
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.HsqldbTestUtils;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration test for Feed Export.
+ */
+
+@Test
+public class FeedExportIT {
+ public static final Logger LOG = LoggerFactory.getLogger(FeedExportIT.class);
+
+ private static final String DATASOURCE_NAME_KEY = "datasourcename";
+ private static final String METASTORE_URL = "thrift://localhost:49083";
+ private static final String DATABASE_NAME = "SqoopTestDB";
+ private static final String TABLE_NAME = "SqoopTestTable";
+
+ private HCatClient client;
+ private CatalogStorage storage;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ HsqldbTestUtils.start();
+ HsqldbTestUtils.createSqoopUser("sqoop_user", "sqoop");
+ HsqldbTestUtils.changeSAPassword("sqoop");
+ HsqldbTestUtils.createAndPopulateCustomerTable();
+
+ TestContext.cleanupStore();
+ TestContext.prepare();
+
+ // setup hcat
+ CurrentUser.authenticate(TestContext.REMOTE_USER);
+ client = TestContext.getHCatClient(METASTORE_URL);
+
+ HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+ List<String> partitionKeys = new ArrayList<>();
+ partitionKeys.add("year");
+ partitionKeys.add("month");
+ partitionKeys.add("day");
+ partitionKeys.add("hour");
+ HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ HsqldbTestUtils.tearDown();
+ FileUtils.deleteDirectory(new File("../localhost/"));
+ FileUtils.deleteDirectory(new File("localhost"));
+ }
+
+ @Test
+ public void testFeedExportHSql() throws Exception {
+ Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows());
+ }
+
+ @Test
+ public void testSqoopExport() throws Exception {
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+ String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ context.setCluster(filePath);
+ LOG.info("entity -submit -type cluster -file " + filePath);
+ Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+ // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3
+ // are populated with the same datasource name
+ String dsName = "datasource-test-1";
+ overlay.put(DATASOURCE_NAME_KEY, dsName);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay);
+ LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath);
+ Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_EXPORT_TEMPLATE6, overlay);
+ LOG.info("Submit export feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName,
+ filePath);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
index c34bcfc..2efe4bb 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
@@ -22,13 +22,17 @@ import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.HiveTestUtils;
import org.apache.falcon.util.HsqldbTestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.HCatClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
@@ -37,6 +41,8 @@ import org.testng.annotations.Test;
import java.io.File;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
@@ -48,6 +54,12 @@ public class FeedImportIT {
public static final Logger LOG = LoggerFactory.getLogger(FeedImportIT.class);
private static final String DATASOURCE_NAME_KEY = "datasourcename";
+ private static final String METASTORE_URL = "thrift://localhost:49083";
+ private static final String DATABASE_NAME = "SqoopTestDB";
+ private static final String TABLE_NAME = "SqoopTestTable";
+
+ private HCatClient client;
+ private CatalogStorage storage;
@BeforeClass
public void setUp() throws Exception {
@@ -58,6 +70,18 @@ public class FeedImportIT {
TestContext.cleanupStore();
TestContext.prepare();
+
+ // setup hcat
+ CurrentUser.authenticate(TestContext.REMOTE_USER);
+ client = TestContext.getHCatClient(METASTORE_URL);
+
+ HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+ List<String> partitionKeys = new ArrayList<>();
+ partitionKeys.add("year");
+ partitionKeys.add("month");
+ partitionKeys.add("day");
+ partitionKeys.add("hour");
+ HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
}
@AfterClass
@@ -154,7 +178,8 @@ public class FeedImportIT {
Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
- LOG.info("Submit FEED entity with datasource {} via entity -submit -type feed -file {}", dsName, filePath);
+ LOG.info("Submit import FEED entity with datasource {} via entity -submit -type feed -file {}",
+ dsName, filePath);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
}
@@ -200,7 +225,8 @@ public class FeedImportIT {
Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
- LOG.info("Submit FEED entity with datasource {} via entity -submit -type feed -file {}", dsName, filePath);
+ LOG.info("Submit import FEED entity with datasource {} via entity -submit -type feed -file {}",
+ dsName, filePath);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
}
@@ -222,7 +248,31 @@ public class FeedImportIT {
Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
- LOG.info("Submit feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, filePath);
+ LOG.info("Submit import feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName,
+ filePath);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+ }
+
+ @Test
+ public void testSqoopHCatImport() throws Exception {
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+ String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ context.setCluster(filePath);
+ LOG.info("entity -submit -type cluster -file " + filePath);
+ Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+ // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3
+ // are populated with the same datasource name
+ String dsName = "datasource-test-1";
+ overlay.put(DATASOURCE_NAME_KEY, dsName);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay);
+ LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath);
+ Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE5, overlay);
+ LOG.info("Submit import feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName,
+ filePath);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
index 413dfde..ed27306 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
@@ -33,7 +33,8 @@ public abstract class AbstractTestContext {
public static final String FEED_TEMPLATE1 = "/feed-template1.xml";
public static final String FEED_TEMPLATE2 = "/feed-template2.xml";
public static final String FEED_TEMPLATE3 = "/feed-template3.xml";
-
+ public static final String FEED_TEMPLATE5 = "/feed-template5.xml";
+ public static final String FEED_EXPORT_TEMPLATE6 = "/feed-export-template6.xml";
public static final String PROCESS_TEMPLATE = "/process-template.xml";
protected static void mkdir(FileSystem fileSystem, Path path) throws Exception {
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/resources/feed-export-template6.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/feed-export-template6.xml b/webapp/src/test/resources/feed-export-template6.xml
new file mode 100644
index 0000000..0eb748b
--- /dev/null
+++ b/webapp/src/test/resources/feed-export-template6.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+ <groups>input</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="##cluster##" type="source">
+ <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/>
+ <retention limit="hours(24)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ <export>
+ <target name="##datasourcename##" tableName="simple_export">
+ <load type="allowinsert"/>
+ <fields>
+ <includes>
+ <field>id</field>
+ <field>name</field>
+ </includes>
+ </fields>
+ </target>
+ <arguments>
+ <argument name="--update-key" value="id"/>
+ </arguments>
+ </export>
+ </cluster>
+ </clusters>
+
+ <locations>
+ <location type="data" path="/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/"/>
+ <location type="stats" path="/projects/falcon/clicksStats"/>
+ <location type="meta" path="/projects/falcon/clicksMetaData"/>
+ </locations>
+
+ <ACL owner="##user##" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/resources/feed-template5.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/feed-template5.xml b/webapp/src/test/resources/feed-template5.xml
new file mode 100644
index 0000000..150ce87
--- /dev/null
+++ b/webapp/src/test/resources/feed-template5.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+ <groups>input</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="##cluster##" type="source">
+ <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/>
+ <retention limit="hours(24)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ <import>
+ <source name="##datasourcename##" tableName="simple">
+ <extract type="full">
+ <mergepolicy>snapshot</mergepolicy>
+ </extract>
+ <fields>
+ <includes>
+ <field>id</field>
+ <field>name</field>
+ </includes>
+ </fields>
+ </source>
+ <arguments>
+ <argument name="--split-by" value="id"/>
+ <argument name="--num-mappers" value="2"/>
+ </arguments>
+ </import>
+ </cluster>
+ </clusters>
+
+ <table uri="catalog:SqoopTestDB:SqoopTestTable#year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}"/>
+
+ <ACL owner="##user##" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
\ No newline at end of file