You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/01 17:44:36 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-628] Zuora
Connector
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4a18d99 [GOBBLIN-628] Zuora Connector
4a18d99 is described below
commit 4a18d99b0762b258d6f441670d2c8308ec16fa00
Author: Chen Guo <al...@gmail.com>
AuthorDate: Fri Feb 1 09:44:08 2019 -0800
[GOBBLIN-628] Zuora Connector
Closes #2498 from abti/zuora
---
gobblin-modules/gobblin-zuora/.gitignore | 0
gobblin-modules/gobblin-zuora/build.gradle | 66 +++++
.../java/org/apache/gobblin/zuora/ZuoraClient.java | 46 ++++
.../gobblin/zuora/ZuoraClientFilesStreamer.java | 181 ++++++++++++++
.../org/apache/gobblin/zuora/ZuoraClientImpl.java | 244 ++++++++++++++++++
.../gobblin/zuora/ZuoraConfigurationKeys.java | 52 ++++
.../apache/gobblin/zuora/ZuoraDeletedColumn.java | 41 ++++
.../org/apache/gobblin/zuora/ZuoraExtractor.java | 273 +++++++++++++++++++++
.../java/org/apache/gobblin/zuora/ZuoraParams.java | 122 +++++++++
.../java/org/apache/gobblin/zuora/ZuoraQuery.java | 69 ++++++
.../java/org/apache/gobblin/zuora/ZuoraSource.java | 51 ++++
.../java/org/apache/gobblin/zuora/ZuoraUtil.java | 116 +++++++++
.../src/main/resources/zuora_sample.pull | 54 ++++
13 files changed, 1315 insertions(+)
diff --git a/gobblin-modules/gobblin-zuora/.gitignore b/gobblin-modules/gobblin-zuora/.gitignore
new file mode 100644
index 0000000..e69de29
diff --git a/gobblin-modules/gobblin-zuora/build.gradle b/gobblin-modules/gobblin-zuora/build.gradle
new file mode 100644
index 0000000..ab64afe
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/build.gradle
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+ compile project(":gobblin-api")
+ compile project(":gobblin-core-base")
+ compile project(":gobblin-utility")
+ compile project(":gobblin-metrics-libs:gobblin-metrics")
+ compile project(":gobblin-modules:gobblin-sql")
+
+ compile externalDependency.avro
+ compile externalDependency.jacksonCore
+ compile externalDependency.jacksonMapper
+ compile externalDependency.commonsHttpClient
+ compile externalDependency.commonsPool
+ compile externalDependency.commonsLang3
+ compile externalDependency.guava
+ compile externalDependency.slf4j
+ compile externalDependency.httpclient
+ compile externalDependency.httpcore
+ compile externalDependency.lombok
+ compile externalDependency.metricsCore
+ compile externalDependency.typesafeConfig
+ compile externalDependency.findBugsAnnotations
+
+ testCompile project(":gobblin-runtime")
+ testCompile project(":gobblin-test-utils")
+ testCompile externalDependency.jsonAssert
+ testCompile externalDependency.mockito
+ testCompile externalDependency.testng
+}
+
+configurations {
+ compile { transitive = false }
+ // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+ // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+ // HADOOP-5254 and MAPREDUCE-5664
+ all*.exclude group: 'xml-apis'
+ all*.exclude group: 'xerces'
+}
+
+test {
+ workingDir rootProject.rootDir
+}
+
+tasks.withType(Test) {
+ maxParallelForks = 1
+}
+
+ext.classification="library"
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClient.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClient.java
new file mode 100644
index 0000000..abc7307
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+
+
+@Alpha
+public interface ZuoraClient {
+
+ List<Command> buildPostCommand(List<Predicate> predicateList);
+
+ CommandOutput<RestApiCommand, String> executePostRequest(final Command command)
+ throws DataRecordException;
+
+ List<String> getFileIds(final String jobId)
+ throws DataRecordException, IOException;
+
+ CommandOutput<RestApiCommand, String> executeGetRequest(final Command cmd)
+ throws Exception;
+
+ String getEndPoint(String relativeUrl);
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientFilesStreamer.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientFilesStreamer.java
new file mode 100644
index 0000000..79a8294
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientFilesStreamer.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+import javax.net.ssl.HttpsURLConnection;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.resultset.RecordSet;
+import org.apache.gobblin.source.extractor.resultset.RecordSetList;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.gson.JsonElement;
+
+
+@Alpha
+@Slf4j
+public class ZuoraClientFilesStreamer {
+ private final String outputFormat;
+ private final WorkUnitState _workUnitState;
+ private final ZuoraClient _client;
+ private final int batchSize;
+ private final Retryer<Void> _getRetryer;
+
+ private boolean _jobFinished = false;
+ private boolean _jobFailed = false;
+ private long _totalRecords = 0;
+
+ private BufferedReader _currentReader;
+ private int _currentFileIndex = -1;
+ private int _skipHeaderIndex = 0; //Indicate whether the header has been skipped for a file.
+ private HttpsURLConnection _currentConnection;
+
+ public ZuoraClientFilesStreamer(WorkUnitState workUnitState, ZuoraClient client) {
+ _workUnitState = workUnitState;
+ _client = client;
+ batchSize = workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE, 2000);
+ outputFormat = _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_OUTPUT_FORMAT);
+ _getRetryer = RetryerBuilder.<Void>newBuilder().retryIfExceptionOfType(IOException.class).withStopStrategy(
+ StopStrategies
+ .stopAfterAttempt(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_STREAM_FILES_COUNT, 3)))
+ .withWaitStrategy(WaitStrategies
+ .fixedWait(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_STREAM_FILES_WAIT_TIME, 10000),
+ TimeUnit.MILLISECONDS)).build();
+ }
+
+ public RecordSet<JsonElement> streamFiles(List<String> fileList, List<String> header)
+ throws DataRecordException {
+ try {
+ if (currentReaderDone()) {
+ ++_currentFileIndex;
+ closeCurrentSession();
+ if (_currentFileIndex >= fileList.size()) {
+ log.info("Finished streaming all files.");
+ _jobFinished = true;
+ return new RecordSetList<>();
+ }
+ initializeForNewFile(fileList);
+ }
+ log.info(String
+ .format("Streaming file at index %s with id %s ...", _currentFileIndex, fileList.get(_currentFileIndex)));
+ InputStreamCSVReader reader = new InputStreamCSVReader(_currentReader);
+ if (_skipHeaderIndex == _currentFileIndex) {
+ reader.nextRecord(); //skip header
+ ++_skipHeaderIndex;
+ }
+
+ RecordSetList<JsonElement> rs = new RecordSetList<>();
+ List<String> csvRecord;
+ int count = 0;
+ while ((csvRecord = reader.nextRecord()) != null) {
+ rs.add(Utils.csvToJsonObject(header, csvRecord, header.size()));
+ ++_totalRecords;
+ if (++count >= batchSize) {
+ break;
+ }
+ }
+ log.info("Total number of records downloaded: " + _totalRecords);
+ return rs;
+ } catch (IOException e) {
+ try {
+ closeCurrentSession();
+ } catch (IOException e1) {
+ log.error(e1.getMessage());
+ }
+ _jobFailed = true;
+ throw new DataRecordException("Failed to get records from Zuora: " + e.getMessage(), e);
+ }
+ }
+
+ private void initializeForNewFile(List<String> fileList)
+ throws DataRecordException {
+ final String fileId = fileList.get(_currentFileIndex);
+ log.info(String.format("Start streaming file at index %s with id %s", _currentFileIndex, fileId));
+
+ try {
+ _getRetryer.call(new Callable<Void>() {
+ @Override
+ public Void call()
+ throws Exception {
+ Pair<HttpsURLConnection, BufferedReader> initialized = createReader(fileId, _workUnitState);
+ _currentConnection = initialized.getLeft();
+ _currentReader = initialized.getRight();
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new DataRecordException(
+ String.format("Retryer failed: Build connection for streaming failed for file id: %s", fileId), e);
+ }
+ }
+
+ private Pair<HttpsURLConnection, BufferedReader> createReader(String fileId, WorkUnitState workUnitState)
+ throws IOException {
+ HttpsURLConnection connection = ZuoraUtil.getConnection(_client.getEndPoint("file/" + fileId), workUnitState);
+ connection.setRequestProperty("Accept", "application/json");
+ InputStream stream = connection.getInputStream();
+ if (StringUtils.isNotBlank(outputFormat) && outputFormat.equalsIgnoreCase("gzip")) {
+ stream = new GZIPInputStream(stream);
+ }
+ return new ImmutablePair<>(connection, new BufferedReader(new InputStreamReader(stream, "UTF-8")));
+ }
+
+ private void closeCurrentSession()
+ throws IOException {
+ if (_currentConnection != null) {
+ _currentConnection.disconnect();
+ }
+ if (_currentReader != null) {
+ _currentReader.close();
+ }
+ }
+
+ private boolean currentReaderDone()
+ throws IOException {
+ //_currentReader.ready() will be false when there is nothing in _currentReader to be read
+ return _currentReader == null || !_currentReader.ready();
+ }
+
+ public boolean isJobFinished() {
+ return _jobFinished;
+ }
+
+ public boolean isJobFailed() {
+ return _jobFailed;
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientImpl.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientImpl.java
new file mode 100644
index 0000000..3f6c0bf
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.HttpsURLConnection;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommandOutput;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.jdbc.SqlQueryUtils;
+
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+
+@Alpha
+@Slf4j
+class ZuoraClientImpl implements ZuoraClient {
+ private static final Gson GSON = new Gson();
+ private final WorkUnitState _workUnitState;
+ private final String _hostName;
+ private final Retryer<CommandOutput<RestApiCommand, String>> _postRetryer;
+ private final Retryer<List<String>> _getRetryer;
+
+ ZuoraClientImpl(WorkUnitState workUnitState) {
+ _workUnitState = workUnitState;
+ _hostName = _workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
+ _postRetryer =
+ RetryerBuilder.<CommandOutput<RestApiCommand, String>>newBuilder().retryIfExceptionOfType(IOException.class)
+ .withStopStrategy(StopStrategies
+ .stopAfterAttempt(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_POST_COUNT, 20)))
+ .withWaitStrategy(WaitStrategies
+ .fixedWait(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_POST_WAIT_TIME, 60000),
+ TimeUnit.MILLISECONDS)).build();
+ _getRetryer = RetryerBuilder.<List<String>>newBuilder().retryIfExceptionOfType(IOException.class).withStopStrategy(
+ StopStrategies
+ .stopAfterAttempt(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_GET_FILES_COUNT, 30)))
+ .withWaitStrategy(WaitStrategies
+ .fixedWait(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_GET_FILES_WAIT_TIME, 30000),
+ TimeUnit.MILLISECONDS)).build();
+ }
+
+ @Override
+ public List<Command> buildPostCommand(List<Predicate> predicateList) {
+ String host = getEndPoint("batch-query/");
+ List<String> params = Lists.newLinkedList();
+ params.add(host);
+
+ String query = _workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_QUERY,
+ "SELECT * FROM " + _workUnitState.getProp(ConfigurationKeys.SOURCE_ENTITY));
+
+ if (predicateList != null) {
+ for (Predicate predicate : predicateList) {
+ query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
+ }
+ }
+
+ String rowLimit = _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_ROW_LIMIT);
+ if (StringUtils.isNotBlank(rowLimit)) {
+ query += " LIMIT " + rowLimit;
+ }
+
+ List<ZuoraQuery> queries = Lists.newArrayList();
+ queries.add(new ZuoraQuery(_workUnitState.getProp(ConfigurationKeys.JOB_NAME_KEY), query,
+ _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_DELTED_COLUMN, "")));
+ ZuoraParams filterPayload = new ZuoraParams(_workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_PARTNER, "sample"),
+ _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_PROJECT, "sample"), queries,
+ _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_API_NAME, "sample"),
+ _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_OUTPUT_FORMAT, "csv"),
+ _workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_VERSION, "1.1"));
+ params.add(GSON.toJson(filterPayload));
+ return Collections.singletonList(new RestApiCommand().build(params, RestApiCommand.RestApiCommandType.POST));
+ }
+
+ @Override
+ public CommandOutput<RestApiCommand, String> executePostRequest(final Command command)
+ throws DataRecordException {
+ try {
+ return _postRetryer.call(new Callable<CommandOutput<RestApiCommand, String>>() {
+ @Override
+ public CommandOutput<RestApiCommand, String> call()
+ throws Exception {
+ return executePostRequestInternal(command);
+ }
+ });
+ } catch (Exception e) {
+ throw new DataRecordException("Post request failed for command: " + command.toString(), e);
+ }
+ }
+
+ public static String getJobId(CommandOutput<?, ?> postResponse)
+ throws DataRecordException {
+ Iterator<String> itr = (Iterator<String>) postResponse.getResults().values().iterator();
+ if (!itr.hasNext()) {
+ throw new DataRecordException("Failed to get data from RightNowCloud; REST postResponse has no output");
+ }
+
+ String stringResponse = itr.next();
+ log.info("Zuora post response: " + stringResponse);
+ JsonObject jsonObject = GSON.fromJson(stringResponse, JsonObject.class).getAsJsonObject();
+ return jsonObject.get("id").getAsString();
+ }
+
+ @Override
+ public List<String> getFileIds(final String jobId)
+ throws DataRecordException, IOException {
+ log.info("Getting files for job " + jobId);
+ String url = getEndPoint("batch-query/jobs/" + jobId);
+ final Command cmd = new RestApiCommand().build(Collections.singleton(url), RestApiCommand.RestApiCommandType.GET);
+
+ try {
+ return _getRetryer.call(new Callable<List<String>>() {
+ @Override
+ public List<String> call()
+ throws Exception {
+ return executeGetRequestInternal(cmd, jobId);
+ }
+ });
+ } catch (Exception e) {
+ throw new DataRecordException("Get request failed for command: " + cmd.toString(), e);
+ }
+ }
+
+ private List<String> executeGetRequestInternal(Command cmd, String jobId)
+ throws IOException, DataRecordException {
+ CommandOutput<RestApiCommand, String> response = executeGetRequest(cmd);
+ Iterator<String> itr = response.getResults().values().iterator();
+ if (!itr.hasNext()) {
+ throw new DataRecordException("Failed to get file Ids based on job id " + jobId);
+ }
+ String output = itr.next();
+ JsonObject jsonResp = GSON.fromJson(output, JsonObject.class).getAsJsonObject();
+ String status = jsonResp.get("status").getAsString();
+ log.info(String.format("Job %s %s: %s", jobId, status, output));
+ if (!status.equals("completed")) {
+ throw new IOException("Retrying... This exception will be handled by retryer.");
+ }
+ List<String> fileIds = Lists.newArrayList();
+ for (JsonElement jsonObj : jsonResp.get("batches").getAsJsonArray()) {
+ fileIds.add(jsonObj.getAsJsonObject().get("fileId").getAsString());
+ }
+ log.info("Get Files Response - FileIds: " + fileIds);
+ return fileIds;
+ }
+
+ @Override
+ public CommandOutput<RestApiCommand, String> executeGetRequest(final Command cmd)
+ throws IOException {
+ HttpsURLConnection connection = null;
+ try {
+ String urlPath = cmd.getParams().get(0);
+ connection = ZuoraUtil.getConnection(urlPath, _workUnitState);
+ connection.setRequestProperty("Accept", "application/json");
+
+ String result = ZuoraUtil.getStringFromInputStream(connection.getInputStream());
+ CommandOutput<RestApiCommand, String> output = new RestApiCommandOutput();
+ output.put((RestApiCommand) cmd, result);
+ return output;
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ private CommandOutput<RestApiCommand, String> executePostRequestInternal(Command command)
+ throws IOException {
+ List<String> params = command.getParams();
+ String payLoad = params.get(1);
+ log.info("Executing post request with payLoad:" + payLoad);
+
+ BufferedReader br = null;
+ HttpsURLConnection connection = null;
+ try {
+ connection = ZuoraUtil.getConnection(params.get(0), _workUnitState);
+ connection.setDoOutput(true);
+ connection.setRequestMethod("POST");
+
+ OutputStream os = connection.getOutputStream();
+ os.write(payLoad.getBytes("UTF-8"));
+ os.flush();
+
+ br = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
+ StringBuilder result = new StringBuilder();
+ String line;
+ while ((line = br.readLine()) != null) {
+ result.append(line);
+ }
+ CommandOutput<RestApiCommand, String> output = new RestApiCommandOutput();
+ output.put((RestApiCommand) command, result.toString());
+ return output;
+ } finally {
+ if (br != null) {
+ br.close();
+ }
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ @Override
+ public String getEndPoint(String relativeUrl) {
+ return _hostName + relativeUrl;
+ }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraConfigurationKeys.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraConfigurationKeys.java
new file mode 100644
index 0000000..7e644cc
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraConfigurationKeys.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+@Alpha
+public class ZuoraConfigurationKeys {
+ private ZuoraConfigurationKeys() {
+ }
+
+ public static final String ZUORA_OUTPUT_FORMAT = "zuora.output.format";
+ public static final String ZUORA_API_NAME = "zuora.api.name";
+ public static final String ZUORA_PARTNER = "zuora.partner";
+ public static final String ZUORA_PROJECT = "zuora.project";
+
+ /**
+ * If you add a deleted column, for example, zuora.deleted.column=IsDeleted
+ * the schema needs to be changed accordingly.
+ * For example, the column below needs to be included as the first column in your schema definition
+ * { "columnName":"IsDeleted", "isNullable":"FALSE", "dataType":{"type":"boolean"}, "comment":"" }
+ *
+ * Check the documentation at
+ * https://knowledgecenter.zuora.com/DC_Developers/T_Aggregate_Query_API/BA_Stateless_and_Stateful_Modes
+ */
+ public static final String ZUORA_DELTED_COLUMN = "zuora.deleted.column";
+ public static final String ZUORA_TIMESTAMP_COLUMNS = "zuora.timestamp.columns";
+ public static final String ZUORA_ROW_LIMIT = "zuora.row.limit";
+
+ public static final String ZUORA_API_RETRY_POST_COUNT = "zuora.api.retry.post.count";
+ public static final String ZUORA_API_RETRY_POST_WAIT_TIME = "zuora.api.retry.post.wait_time";
+ public static final String ZUORA_API_RETRY_GET_FILES_COUNT = "zuora.api.retry.get_files.count";
+ public static final String ZUORA_API_RETRY_GET_FILES_WAIT_TIME = "zuora.api.retry.get_files.wait_time";
+ public static final String ZUORA_API_RETRY_STREAM_FILES_COUNT = "zuora.api.retry.stream_files.count";
+ public static final String ZUORA_API_RETRY_STREAM_FILES_WAIT_TIME = "zuora.api.retry.stream_files.wait_time";
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraDeletedColumn.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraDeletedColumn.java
new file mode 100644
index 0000000..9483123
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraDeletedColumn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.Serializable;
+import org.apache.gobblin.annotation.Alpha;
+
+
+@Alpha
+public class ZuoraDeletedColumn implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ String column;
+
+ ZuoraDeletedColumn(String columnName) {
+ column = columnName;
+ }
+
+ public String getColumn() {
+ return column;
+ }
+
+ public void setColumn(String column) {
+ this.column = column;
+ }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraExtractor.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraExtractor.java
new file mode 100644
index 0000000..0eef4a6
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraExtractor.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.exception.HighWatermarkException;
+import org.apache.gobblin.source.extractor.exception.RecordCountException;
+import org.apache.gobblin.source.extractor.exception.SchemaException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.QueryBasedExtractor;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.schema.Schema;
+import org.apache.gobblin.source.extractor.utils.Utils;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+
+@Alpha
+@Slf4j
+public class ZuoraExtractor extends QueryBasedExtractor<JsonArray, JsonElement> {
+ private static final Gson GSON = new Gson();
+ private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
+ private static final String DATE_FORMAT = "yyyy-MM-dd";
+ private static final String HOUR_FORMAT = "HH";
+ private final ZuoraClient _client;
+ private ZuoraClientFilesStreamer _fileStreamer;
+ private List<String> _fileIds;
+ private List<String> _header;
+
+ public ZuoraExtractor(WorkUnitState workUnitState) {
+ super(workUnitState);
+ _client = new ZuoraClientImpl(workUnitState);
+ }
+
+ @Override
+ public Iterator<JsonElement> getRecordSet(String schema, String entity, WorkUnit workUnit,
+ List<Predicate> predicateList)
+ throws DataRecordException, IOException {
+ if (_fileStreamer == null || _fileStreamer.isJobFailed()) {
+ _fileStreamer = new ZuoraClientFilesStreamer(workUnitState, _client);
+ }
+
+ if (_fileIds == null) {
+ List<Command> cmds = _client.buildPostCommand(predicateList);
+ CommandOutput<RestApiCommand, String> postResponse = _client.executePostRequest(cmds.get(0));
+ String jobId = ZuoraClientImpl.getJobId(postResponse);
+ _fileIds = _client.getFileIds(jobId);
+ }
+
+ if (!_fileStreamer.isJobFinished()) {
+ return _fileStreamer.streamFiles(_fileIds, _header).iterator();
+ }
+
+ return null;
+ }
+
+ @Override
+ protected boolean isInitialPull() {
+ return _fileIds == null || _fileStreamer.isJobFailed();
+ }
+
+ @Override
+ public void extractMetadata(String schema, String entity, WorkUnit workUnit)
+ throws SchemaException, IOException {
+ String deltaFields = workUnit.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
+ String primaryKeyColumn = workUnit.getProp(ConfigurationKeys.EXTRACT_PRIMARY_KEY_FIELDS_KEY);
+ JsonArray columnArray = new JsonArray();
+ _header = new ArrayList<>();
+
+ try {
+ JsonArray array =
+ GSON.fromJson(workUnit.getProp(ConfigurationKeys.SOURCE_SCHEMA), JsonArray.class).getAsJsonArray();
+ for (JsonElement columnElement : array) {
+ Schema obj = GSON.fromJson(columnElement, Schema.class);
+ String columnName = obj.getColumnName();
+ _header.add(columnName);
+
+ boolean isWaterMarkColumn = isWatermarkColumn(deltaFields, columnName);
+ if (isWaterMarkColumn) {
+ obj.setWaterMark(true);
+ obj.setNullable(false);
+ }
+
+ int primarykeyIndex = getPrimarykeyIndex(primaryKeyColumn, columnName);
+ obj.setPrimaryKey(primarykeyIndex);
+ boolean isPrimaryKeyColumn = primarykeyIndex > 0;
+ if (isPrimaryKeyColumn) {
+ obj.setNullable(false);
+ }
+
+ String jsonStr = GSON.toJson(obj);
+ JsonObject jsonObject = GSON.fromJson(jsonStr, JsonObject.class).getAsJsonObject();
+ columnArray.add(jsonObject);
+ }
+
+ log.info("Update Schema is:" + columnArray);
+ setOutputSchema(columnArray);
+ } catch (Exception e) {
+ throw new SchemaException("Failed to get schema using rest api; error - " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public long getMaxWatermark(String schema, String entity, String watermarkColumn,
+ List<Predicate> snapshotPredicateList, String watermarkSourceFormat)
+ throws HighWatermarkException {
+ throw new HighWatermarkException(
+ "GetMaxWatermark with query is not supported! Please set source.querybased.skip.high.watermark.calc to true.");
+ }
+
+ @Override
+ public long getSourceCount(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList)
+ throws RecordCountException {
+ // Set source.querybased.skip.count.calc to true will set SourceCount to -1. However, ...
+
+ // This ExpectedRecordCount will determine tablesWithNoUpdatesOnPreviousRun in QueryBasedSource.
+ // We need to return a positive number to bypass this check and move Low watermark forward.
+
+ return 1;
+ }
+
+ @Override
+ public String getWatermarkSourceFormat(WatermarkType watermarkType) {
+ switch (watermarkType) {
+ case TIMESTAMP:
+ return TIMESTAMP_FORMAT;
+ case DATE:
+ return DATE_FORMAT;
+ case HOUR:
+ return HOUR_FORMAT;
+ default:
+ throw new RuntimeException("Watermark type " + watermarkType.toString() + " is not supported");
+ }
+ }
+
+ @Override
+ public String getHourPredicateCondition(String column, long value, String valueFormat, String operator) {
+ String hourPredicate = String
+ .format("%s %s '%s'", column, operator, Utils.toDateTimeFormat(Long.toString(value), valueFormat, HOUR_FORMAT));
+ log.info("Hour predicate is: " + hourPredicate);
+
+ return hourPredicate;
+ }
+
+ @Override
+ public String getDatePredicateCondition(String column, long value, String valueFormat, String operator) {
+ String datePredicate = String
+ .format("%s %s '%s'", column, operator, Utils.toDateTimeFormat(Long.toString(value), valueFormat, DATE_FORMAT));
+ log.info("Date predicate is: " + datePredicate);
+
+ return datePredicate;
+ }
+
+ @Override
+ public String getTimestampPredicateCondition(String column, long value, String valueFormat, String operator) {
+ String timeStampPredicate = String.format("%s %s '%s'", column, operator,
+ Utils.toDateTimeFormat(Long.toString(value), valueFormat, TIMESTAMP_FORMAT));
+ log.info("Timestamp predicate is: " + timeStampPredicate);
+
+ return timeStampPredicate;
+ }
+
+ @Override
+ public Map<String, String> getDataTypeMap() {
+ Map<String, String> dataTypeMap =
+ ImmutableMap.<String, String>builder().put("date", "date").put("datetime", "timestamp").put("time", "time")
+ .put("string", "string").put("int", "int").put("long", "long").put("float", "float").put("double", "double")
+ .put("decimal", "double").put("varchar", "string").put("boolean", "boolean").build();
+
+ return dataTypeMap;
+ }
+
+ List<String> extractHeader(ArrayList<String> firstLine) {
+ List<String> header = ZuoraUtil.getHeader(firstLine);
+ if (StringUtils.isBlank(workUnitState.getProp(ConfigurationKeys.SOURCE_SCHEMA))) {
+ List<String> timeStampColumns = Lists.newArrayList();
+ String timeStampColumnString = workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_TIMESTAMP_COLUMNS);
+ if (StringUtils.isNotBlank(timeStampColumnString)) {
+ timeStampColumns = Arrays.asList(timeStampColumnString.toLowerCase().replaceAll(" ", "").split(","));
+ }
+ setSchema(header, timeStampColumns);
+ }
+ log.info("Record header: " + header);
+
+ return header;
+ }
+
+ private void setSchema(List<String> cols, List<String> timestampColumns) {
+ JsonArray columnArray = new JsonArray();
+ for (String columnName : cols) {
+ Schema obj = new Schema();
+ obj.setColumnName(columnName);
+ obj.setComment("resolved");
+ obj.setWaterMark(isWatermarkColumn(workUnit.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY), columnName));
+
+ if (isWatermarkColumn(workUnit.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY), columnName)) {
+ obj.setNullable(false);
+ obj.setDataType(convertDataType(columnName, "timestamp", null, null));
+ } else if (getPrimarykeyIndex(workUnit.getProp(ConfigurationKeys.EXTRACT_PRIMARY_KEY_FIELDS_KEY), columnName)
+ == 0) {
+ // set all columns as nullable except primary key and watermark columns
+ obj.setNullable(true);
+ }
+
+ if (timestampColumns != null && timestampColumns.contains(columnName.toLowerCase())) {
+ obj.setDataType(convertDataType(columnName, "timestamp", null, null));
+ }
+
+ obj.setPrimaryKey(
+ getPrimarykeyIndex(workUnit.getProp(ConfigurationKeys.EXTRACT_PRIMARY_KEY_FIELDS_KEY), columnName));
+
+ String jsonStr = GSON.toJson(obj);
+ JsonObject jsonObject = GSON.fromJson(jsonStr, JsonObject.class).getAsJsonObject();
+ columnArray.add(jsonObject);
+ }
+
+ log.info("Resolved Schema: " + columnArray);
+ this.setOutputSchema(columnArray);
+ }
+
+ @Override
+ public void closeConnection()
+ throws Exception {
+ }
+
+ @Override
+ public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit,
+ List<Predicate> predicateList)
+ throws IOException {
+ throw new RuntimeException("Not supported");
+ }
+
+ @Override
+ public void setTimeOut(int timeOut) {
+ // Ignore for now
+ }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraParams.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraParams.java
new file mode 100644
index 0000000..2d1c46d
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraParams.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+@Alpha
+public class ZuoraParams implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ String name;
+ String partner;
+ String project;
+ List<ZuoraQuery> queries;
+ String format;
+ String version;
+ String encrypted = "none";
+ String useQueryLabels = "false";
+ String dateTimeUtc = "true";
+
+ ZuoraParams(String partner, String project, List<ZuoraQuery> queries, String name, String format, String version) {
+ super();
+ this.partner = partner;
+ this.project = project;
+ this.queries = queries;
+ this.name = name;
+ this.format = format;
+ this.version = version;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public void setFormat(String format) {
+ this.format = format;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getEncrypted() {
+ return encrypted;
+ }
+
+ public void setEncrypted(String encrypted) {
+ this.encrypted = encrypted;
+ }
+
+ public String getUseQueryLabels() {
+ return useQueryLabels;
+ }
+
+ public void setUseQueryLabels(String useQueryLabels) {
+ this.useQueryLabels = useQueryLabels;
+ }
+
+ public String getPartner() {
+ return partner;
+ }
+
+ public void setPartner(String partner) {
+ this.partner = partner;
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public void setProject(String project) {
+ this.project = project;
+ }
+
+ public String getDateTimeUtc() {
+ return dateTimeUtc;
+ }
+
+ public void setDateTimeUtc(String dateTimeUtc) {
+ this.dateTimeUtc = dateTimeUtc;
+ }
+
+ public List<ZuoraQuery> getQueries() {
+ return queries;
+ }
+
+ public void setQueries(List<ZuoraQuery> queries) {
+ this.queries = queries;
+ }
+
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraQuery.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraQuery.java
new file mode 100644
index 0000000..d703fcd
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraQuery.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.Serializable;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import com.google.common.base.Strings;
+
+
+@Alpha
+public class ZuoraQuery implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public String name;
+ public String query;
+ public String type = "zoqlexport";
+ //Check the documentation here:
+ //https://knowledgecenter.zuora.com/DC_Developers/T_Aggregate_Query_API/BA_Stateless_and_Stateful_Modes
+ public ZuoraDeletedColumn deleted = null;
+
+ ZuoraQuery(String name, String query, String deleteColumn) {
+ super();
+ this.name = name;
+ this.query = query;
+ if (!Strings.isNullOrEmpty(deleteColumn)) {
+ deleted = new ZuoraDeletedColumn(deleteColumn);
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraSource.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraSource.java
new file mode 100644
index 0000000..37b63b3
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
+import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
+
+/**
+ * An implementation of Zuora source to get work units
+ */
+@Alpha
+public class ZuoraSource extends QueryBasedSource<JsonArray, JsonElement> {
+ private static final Logger LOG = LoggerFactory.getLogger(QueryBasedSource.class);
+
+ public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state) throws IOException {
+ Extractor<JsonArray, JsonElement> extractor = null;
+ try {
+ extractor = new ZuoraExtractor(state).build();
+ } catch (ExtractPrepareException e) {
+ LOG.error("Failed to prepare extractor: error - " + e.getMessage());
+ throw new IOException(e);
+ }
+ return extractor;
+ }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraUtil.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraUtil.java
new file mode 100644
index 0000000..91edf5e
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraUtil.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import javax.net.ssl.HttpsURLConnection;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.password.PasswordManager;
+
+import com.google.common.collect.Lists;
+
+
+@Alpha
+@Slf4j
+public class ZuoraUtil {
+
+ private ZuoraUtil() {
+ }
+
+ public static HttpsURLConnection getConnection(String urlPath, WorkUnitState workUnitState)
+ throws IOException {
+ log.info("URL: " + urlPath);
+
+ URL url = new URL(urlPath);
+ HttpsURLConnection connection;
+ String proxyUrl = workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL);
+ if (StringUtils.isNotBlank(proxyUrl)) {
+ int proxyPort = workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_CONN_USE_PROXY_PORT);
+ Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyUrl, proxyPort));
+ connection = (HttpsURLConnection) url.openConnection(proxy);
+ } else {
+ connection = (HttpsURLConnection) url.openConnection();
+ }
+
+ connection.setRequestProperty("Content-Type", "application/json");
+
+ String userName = workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME);
+ if (StringUtils.isNotBlank(userName)) {
+ String password =
+ PasswordManager.getInstance(workUnitState).readPassword(workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD));
+ String userpass = userName + ":" + password;
+ String basicAuth = "Basic " + new String(new Base64().encode(userpass.getBytes("UTF-8")), "UTF-8");
+ connection.setRequestProperty("Authorization", basicAuth);
+ }
+
+ connection.setConnectTimeout(workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_CONN_TIMEOUT, 30000));
+ return connection;
+ }
+
+ public static String getStringFromInputStream(InputStream is) {
+ BufferedReader br = null;
+ StringBuilder sb = new StringBuilder();
+ String line;
+ try {
+ br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ while ((line = br.readLine()) != null) {
+ sb.append(line);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ public static List<String> getHeader(ArrayList<String> cols) {
+ List<String> columns = Lists.newArrayList();
+ for (String col : cols) {
+ String[] colRefs = col.split(":");
+ String columnName;
+ if (colRefs.length >= 2) {
+ columnName = colRefs[1];
+ } else {
+ columnName = colRefs[0];
+ }
+ columns.add(columnName.replaceAll(" ", "").trim());
+ }
+ return columns;
+ }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/resources/zuora_sample.pull b/gobblin-modules/gobblin-zuora/src/main/resources/zuora_sample.pull
new file mode 100644
index 0000000..eace140
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/resources/zuora_sample.pull
@@ -0,0 +1,54 @@
+job.group=Zuora_Sample
+extract.namespace=Zuora_Sample
+source.class=org.apache.gobblin.zuora.ZuoraSource
+converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
+
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.fs.uri=file://localhost/
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+
+source.querybased.extract.type=snapshot
+source.querybased.low.watermark.backup.secs=300
+source.max.number.of.partitions=1
+source.querybased.partition.interval=1
+source.querybased.fetch.size=10000
+source.querybased.schema=Zuora
+source.querybased.watermark.type=timestamp
+source.querybased.skip.high.watermark.calc=true
+source.querybased.start.value=CURRENTDAY-3
+# source.querybased.append.max.watermark.limit=CURRENTDATE
+
+source.conn.username=<USERNAME>
+source.conn.password=<PASSWORD RAW OR ENCRYPTED>
+encrypt.key.loc=<PASSWORD KEY FILE PATH>
+source.conn.host=https://www.zuora.com/apps/api/
+source.conn.version=1.2
+source.conn.timeout=30000
+
+zuora.output.format=gzip
+zuora.api.name=<ZUORA API NAME>
+zuora.partner=<ZUORA PARTNER>
+zuora.project=<ZUORA PROJECT>
+zuora.timestamp.columns=CreatedDate,UpdatedDate
+zuora.deleted.column=IsDeleted
+
+extract.table.type=snapshot_append
+extract.delta.fields=UpdatedDate
+extract.primary.key.fields=Id
+
+converter.avro.timestamp.format=yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss
+converter.avro.date.format=yyyy-MM-dd
+converter.avro.time.format=HH:mm:ss
+converter.avro.date.timezone=America/Los_Angeles
+converter.is.epoch.time.in.seconds=true
+source.timezone=America/Los_Angeles
+workunit.retry.enabled=true
+task.retry.intervalinsec=20
+task.maxretries=10
+
+writer.include.record.count.in.file.names=true
+
+# Proxy settings
+source.conn.use.proxy.url=<PROXY URL if any>
+source.conn.use.proxy.port=<PROXY PORT if any>
\ No newline at end of file