You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/01 17:44:36 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-628] Zuora Connector

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a18d99  [GOBBLIN-628] Zuora Connector
4a18d99 is described below

commit 4a18d99b0762b258d6f441670d2c8308ec16fa00
Author: Chen Guo <al...@gmail.com>
AuthorDate: Fri Feb 1 09:44:08 2019 -0800

    [GOBBLIN-628] Zuora Connector
    
    Closes #2498 from abti/zuora
---
 gobblin-modules/gobblin-zuora/.gitignore           |   0
 gobblin-modules/gobblin-zuora/build.gradle         |  66 +++++
 .../java/org/apache/gobblin/zuora/ZuoraClient.java |  46 ++++
 .../gobblin/zuora/ZuoraClientFilesStreamer.java    | 181 ++++++++++++++
 .../org/apache/gobblin/zuora/ZuoraClientImpl.java  | 244 ++++++++++++++++++
 .../gobblin/zuora/ZuoraConfigurationKeys.java      |  52 ++++
 .../apache/gobblin/zuora/ZuoraDeletedColumn.java   |  41 ++++
 .../org/apache/gobblin/zuora/ZuoraExtractor.java   | 273 +++++++++++++++++++++
 .../java/org/apache/gobblin/zuora/ZuoraParams.java | 122 +++++++++
 .../java/org/apache/gobblin/zuora/ZuoraQuery.java  |  69 ++++++
 .../java/org/apache/gobblin/zuora/ZuoraSource.java |  51 ++++
 .../java/org/apache/gobblin/zuora/ZuoraUtil.java   | 116 +++++++++
 .../src/main/resources/zuora_sample.pull           |  54 ++++
 13 files changed, 1315 insertions(+)

diff --git a/gobblin-modules/gobblin-zuora/.gitignore b/gobblin-modules/gobblin-zuora/.gitignore
new file mode 100644
index 0000000..e69de29
diff --git a/gobblin-modules/gobblin-zuora/build.gradle b/gobblin-modules/gobblin-zuora/build.gradle
new file mode 100644
index 0000000..ab64afe
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/build.gradle
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+  compile project(":gobblin-api")
+  compile project(":gobblin-core-base")
+  compile project(":gobblin-utility")
+  compile project(":gobblin-metrics-libs:gobblin-metrics")
+  compile project(":gobblin-modules:gobblin-sql")
+
+  compile externalDependency.avro
+  compile externalDependency.jacksonCore
+  compile externalDependency.jacksonMapper
+  compile externalDependency.commonsHttpClient
+  compile externalDependency.commonsPool
+  compile externalDependency.commonsLang3
+  compile externalDependency.guava
+  compile externalDependency.slf4j
+  compile externalDependency.httpclient
+  compile externalDependency.httpcore
+  compile externalDependency.lombok
+  compile externalDependency.metricsCore
+  compile externalDependency.typesafeConfig
+  compile externalDependency.findBugsAnnotations
+
+  testCompile project(":gobblin-runtime")
+  testCompile project(":gobblin-test-utils")
+  testCompile externalDependency.jsonAssert
+  testCompile externalDependency.mockito
+  testCompile externalDependency.testng
+}
+
+configurations {
+  compile { transitive = false }
+  // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+  // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+  // HADOOP-5254 and MAPREDUCE-5664
+  all*.exclude group: 'xml-apis'
+  all*.exclude group: 'xerces'
+}
+
+test {
+  workingDir rootProject.rootDir
+}
+
+tasks.withType(Test)  {
+  maxParallelForks = 1
+}
+
+ext.classification="library"
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClient.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClient.java
new file mode 100644
index 0000000..abc7307
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+
+
+@Alpha
+public interface ZuoraClient {
+
+  List<Command> buildPostCommand(List<Predicate> predicateList);
+
+  CommandOutput<RestApiCommand, String> executePostRequest(final Command command)
+      throws DataRecordException;
+
+  List<String> getFileIds(final String jobId)
+      throws DataRecordException, IOException;
+
+  CommandOutput<RestApiCommand, String> executeGetRequest(final Command cmd)
+      throws Exception;
+
+  String getEndPoint(String relativeUrl);
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientFilesStreamer.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientFilesStreamer.java
new file mode 100644
index 0000000..79a8294
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientFilesStreamer.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+import javax.net.ssl.HttpsURLConnection;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.resultset.RecordSet;
+import org.apache.gobblin.source.extractor.resultset.RecordSetList;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.gson.JsonElement;
+
+
+@Alpha
+@Slf4j
+public class ZuoraClientFilesStreamer {
+  private final String outputFormat;
+  private final WorkUnitState _workUnitState;
+  private final ZuoraClient _client;
+  private final int batchSize;
+  private final Retryer<Void> _getRetryer;
+
+  private boolean _jobFinished = false;
+  private boolean _jobFailed = false;
+  private long _totalRecords = 0;
+
+  private BufferedReader _currentReader;
+  private int _currentFileIndex = -1;
+  private int _skipHeaderIndex = 0; //Indicate whether the header has been skipped for a file.
+  private HttpsURLConnection _currentConnection;
+
+  public ZuoraClientFilesStreamer(WorkUnitState workUnitState, ZuoraClient client) {
+    _workUnitState = workUnitState;
+    _client = client;
+    batchSize = workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE, 2000);
+    outputFormat = _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_OUTPUT_FORMAT);
+    _getRetryer = RetryerBuilder.<Void>newBuilder().retryIfExceptionOfType(IOException.class).withStopStrategy(
+        StopStrategies
+            .stopAfterAttempt(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_STREAM_FILES_COUNT, 3)))
+        .withWaitStrategy(WaitStrategies
+            .fixedWait(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_STREAM_FILES_WAIT_TIME, 10000),
+                TimeUnit.MILLISECONDS)).build();
+  }
+
+  public RecordSet<JsonElement> streamFiles(List<String> fileList, List<String> header)
+      throws DataRecordException {
+    try {
+      if (currentReaderDone()) {
+        ++_currentFileIndex;
+        closeCurrentSession();
+        if (_currentFileIndex >= fileList.size()) {
+          log.info("Finished streaming all files.");
+          _jobFinished = true;
+          return new RecordSetList<>();
+        }
+        initializeForNewFile(fileList);
+      }
+      log.info(String
+          .format("Streaming file at index %s with id %s ...", _currentFileIndex, fileList.get(_currentFileIndex)));
+      InputStreamCSVReader reader = new InputStreamCSVReader(_currentReader);
+      if (_skipHeaderIndex == _currentFileIndex) {
+        reader.nextRecord(); //skip header
+        ++_skipHeaderIndex;
+      }
+
+      RecordSetList<JsonElement> rs = new RecordSetList<>();
+      List<String> csvRecord;
+      int count = 0;
+      while ((csvRecord = reader.nextRecord()) != null) {
+        rs.add(Utils.csvToJsonObject(header, csvRecord, header.size()));
+        ++_totalRecords;
+        if (++count >= batchSize) {
+          break;
+        }
+      }
+      log.info("Total number of records downloaded: " + _totalRecords);
+      return rs;
+    } catch (IOException e) {
+      try {
+        closeCurrentSession();
+      } catch (IOException e1) {
+        log.error(e1.getMessage());
+      }
+      _jobFailed = true;
+      throw new DataRecordException("Failed to get records from Zuora: " + e.getMessage(), e);
+    }
+  }
+
+  private void initializeForNewFile(List<String> fileList)
+      throws DataRecordException {
+    final String fileId = fileList.get(_currentFileIndex);
+    log.info(String.format("Start streaming file at index %s with id %s", _currentFileIndex, fileId));
+
+    try {
+      _getRetryer.call(new Callable<Void>() {
+        @Override
+        public Void call()
+            throws Exception {
+          Pair<HttpsURLConnection, BufferedReader> initialized = createReader(fileId, _workUnitState);
+          _currentConnection = initialized.getLeft();
+          _currentReader = initialized.getRight();
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      throw new DataRecordException(
+          String.format("Retryer failed: Build connection for streaming failed for file id: %s", fileId), e);
+    }
+  }
+
+  private Pair<HttpsURLConnection, BufferedReader> createReader(String fileId, WorkUnitState workUnitState)
+      throws IOException {
+    HttpsURLConnection connection = ZuoraUtil.getConnection(_client.getEndPoint("file/" + fileId), workUnitState);
+    connection.setRequestProperty("Accept", "application/json");
+    InputStream stream = connection.getInputStream();
+    if (StringUtils.isNotBlank(outputFormat) && outputFormat.equalsIgnoreCase("gzip")) {
+      stream = new GZIPInputStream(stream);
+    }
+    return new ImmutablePair<>(connection, new BufferedReader(new InputStreamReader(stream, "UTF-8")));
+  }
+
+  private void closeCurrentSession()
+      throws IOException {
+    if (_currentConnection != null) {
+      _currentConnection.disconnect();
+    }
+    if (_currentReader != null) {
+      _currentReader.close();
+    }
+  }
+
+  private boolean currentReaderDone()
+      throws IOException {
+    //_currentReader.ready() will be false when there is nothing in _currentReader to be read
+    return _currentReader == null || !_currentReader.ready();
+  }
+
+  public boolean isJobFinished() {
+    return _jobFinished;
+  }
+
+  public boolean isJobFailed() {
+    return _jobFailed;
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientImpl.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientImpl.java
new file mode 100644
index 0000000..3f6c0bf
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraClientImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.HttpsURLConnection;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommandOutput;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.jdbc.SqlQueryUtils;
+
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+
+@Alpha
+@Slf4j
+class ZuoraClientImpl implements ZuoraClient {
+  private static final Gson GSON = new Gson();
+  private final WorkUnitState _workUnitState;
+  private final String _hostName;
+  private final Retryer<CommandOutput<RestApiCommand, String>> _postRetryer;
+  private final Retryer<List<String>> _getRetryer;
+
+  ZuoraClientImpl(WorkUnitState workUnitState) {
+    _workUnitState = workUnitState;
+    _hostName = _workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
+    _postRetryer =
+        RetryerBuilder.<CommandOutput<RestApiCommand, String>>newBuilder().retryIfExceptionOfType(IOException.class)
+            .withStopStrategy(StopStrategies
+                .stopAfterAttempt(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_POST_COUNT, 20)))
+            .withWaitStrategy(WaitStrategies
+                .fixedWait(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_POST_WAIT_TIME, 60000),
+                    TimeUnit.MILLISECONDS)).build();
+    _getRetryer = RetryerBuilder.<List<String>>newBuilder().retryIfExceptionOfType(IOException.class).withStopStrategy(
+        StopStrategies
+            .stopAfterAttempt(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_GET_FILES_COUNT, 30)))
+        .withWaitStrategy(WaitStrategies
+            .fixedWait(workUnitState.getPropAsInt(ZuoraConfigurationKeys.ZUORA_API_RETRY_GET_FILES_WAIT_TIME, 30000),
+                TimeUnit.MILLISECONDS)).build();
+  }
+
+  @Override
+  public List<Command> buildPostCommand(List<Predicate> predicateList) {
+    String host = getEndPoint("batch-query/");
+    List<String> params = Lists.newLinkedList();
+    params.add(host);
+
+    String query = _workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_QUERY,
+        "SELECT * FROM " + _workUnitState.getProp(ConfigurationKeys.SOURCE_ENTITY));
+
+    if (predicateList != null) {
+      for (Predicate predicate : predicateList) {
+        query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
+      }
+    }
+
+    String rowLimit = _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_ROW_LIMIT);
+    if (StringUtils.isNotBlank(rowLimit)) {
+      query += " LIMIT " + rowLimit;
+    }
+
+    List<ZuoraQuery> queries = Lists.newArrayList();
+    queries.add(new ZuoraQuery(_workUnitState.getProp(ConfigurationKeys.JOB_NAME_KEY), query,
+        _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_DELTED_COLUMN, "")));
+    ZuoraParams filterPayload = new ZuoraParams(_workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_PARTNER, "sample"),
+        _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_PROJECT, "sample"), queries,
+        _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_API_NAME, "sample"),
+        _workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_OUTPUT_FORMAT, "csv"),
+        _workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_VERSION, "1.1"));
+    params.add(GSON.toJson(filterPayload));
+    return Collections.singletonList(new RestApiCommand().build(params, RestApiCommand.RestApiCommandType.POST));
+  }
+
+  @Override
+  public CommandOutput<RestApiCommand, String> executePostRequest(final Command command)
+      throws DataRecordException {
+    try {
+      return _postRetryer.call(new Callable<CommandOutput<RestApiCommand, String>>() {
+        @Override
+        public CommandOutput<RestApiCommand, String> call()
+            throws Exception {
+          return executePostRequestInternal(command);
+        }
+      });
+    } catch (Exception e) {
+      throw new DataRecordException("Post request failed for command: " + command.toString(), e);
+    }
+  }
+
+  public static String getJobId(CommandOutput<?, ?> postResponse)
+      throws DataRecordException {
+    Iterator<String> itr = (Iterator<String>) postResponse.getResults().values().iterator();
+    if (!itr.hasNext()) {
+      throw new DataRecordException("Failed to get data from RightNowCloud; REST postResponse has no output");
+    }
+
+    String stringResponse = itr.next();
+    log.info("Zuora post response: " + stringResponse);
+    JsonObject jsonObject = GSON.fromJson(stringResponse, JsonObject.class).getAsJsonObject();
+    return jsonObject.get("id").getAsString();
+  }
+
+  @Override
+  public List<String> getFileIds(final String jobId)
+      throws DataRecordException, IOException {
+    log.info("Getting files for job " + jobId);
+    String url = getEndPoint("batch-query/jobs/" + jobId);
+    final Command cmd = new RestApiCommand().build(Collections.singleton(url), RestApiCommand.RestApiCommandType.GET);
+
+    try {
+      return _getRetryer.call(new Callable<List<String>>() {
+        @Override
+        public List<String> call()
+            throws Exception {
+          return executeGetRequestInternal(cmd, jobId);
+        }
+      });
+    } catch (Exception e) {
+      throw new DataRecordException("Get request failed for command: " + cmd.toString(), e);
+    }
+  }
+
+  private List<String> executeGetRequestInternal(Command cmd, String jobId)
+      throws IOException, DataRecordException {
+    CommandOutput<RestApiCommand, String> response = executeGetRequest(cmd);
+    Iterator<String> itr = response.getResults().values().iterator();
+    if (!itr.hasNext()) {
+      throw new DataRecordException("Failed to get file Ids based on job id " + jobId);
+    }
+    String output = itr.next();
+    JsonObject jsonResp = GSON.fromJson(output, JsonObject.class).getAsJsonObject();
+    String status = jsonResp.get("status").getAsString();
+    log.info(String.format("Job %s %s: %s", jobId, status, output));
+    if (!status.equals("completed")) {
+      throw new IOException("Retrying... This exception will be handled by retryer.");
+    }
+    List<String> fileIds = Lists.newArrayList();
+    for (JsonElement jsonObj : jsonResp.get("batches").getAsJsonArray()) {
+      fileIds.add(jsonObj.getAsJsonObject().get("fileId").getAsString());
+    }
+    log.info("Get Files Response - FileIds: " + fileIds);
+    return fileIds;
+  }
+
+  @Override
+  public CommandOutput<RestApiCommand, String> executeGetRequest(final Command cmd)
+      throws IOException {
+    HttpsURLConnection connection = null;
+    try {
+      String urlPath = cmd.getParams().get(0);
+      connection = ZuoraUtil.getConnection(urlPath, _workUnitState);
+      connection.setRequestProperty("Accept", "application/json");
+
+      String result = ZuoraUtil.getStringFromInputStream(connection.getInputStream());
+      CommandOutput<RestApiCommand, String> output = new RestApiCommandOutput();
+      output.put((RestApiCommand) cmd, result);
+      return output;
+    } finally {
+      if (connection != null) {
+        connection.disconnect();
+      }
+    }
+  }
+
+  private CommandOutput<RestApiCommand, String> executePostRequestInternal(Command command)
+      throws IOException {
+    List<String> params = command.getParams();
+    String payLoad = params.get(1);
+    log.info("Executing post request with payLoad:" + payLoad);
+
+    BufferedReader br = null;
+    HttpsURLConnection connection = null;
+    try {
+      connection = ZuoraUtil.getConnection(params.get(0), _workUnitState);
+      connection.setDoOutput(true);
+      connection.setRequestMethod("POST");
+
+      OutputStream os = connection.getOutputStream();
+      os.write(payLoad.getBytes("UTF-8"));
+      os.flush();
+
+      br = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
+      StringBuilder result = new StringBuilder();
+      String line;
+      while ((line = br.readLine()) != null) {
+        result.append(line);
+      }
+      CommandOutput<RestApiCommand, String> output = new RestApiCommandOutput();
+      output.put((RestApiCommand) command, result.toString());
+      return output;
+    } finally {
+      if (br != null) {
+        br.close();
+      }
+      if (connection != null) {
+        connection.disconnect();
+      }
+    }
+  }
+
+  @Override
+  public String getEndPoint(String relativeUrl) {
+    return _hostName + relativeUrl;
+  }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraConfigurationKeys.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraConfigurationKeys.java
new file mode 100644
index 0000000..7e644cc
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraConfigurationKeys.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+@Alpha
+public class ZuoraConfigurationKeys {
+  private ZuoraConfigurationKeys() {
+  }
+
+  public static final String ZUORA_OUTPUT_FORMAT = "zuora.output.format";
+  public static final String ZUORA_API_NAME = "zuora.api.name";
+  public static final String ZUORA_PARTNER = "zuora.partner";
+  public static final String ZUORA_PROJECT = "zuora.project";
+
+  /**
+   * If you add a deleted column, for example, zuora.deleted.column=IsDeleted
+   * the schema needs to be changed accordingly.
+   * For example, the column below needs to be included as the first column in your schema definition
+   * { "columnName":"IsDeleted", "isNullable":"FALSE", "dataType":{"type":"boolean"}, "comment":"" }
+   *
+   * Check the documentation at
+   * https://knowledgecenter.zuora.com/DC_Developers/T_Aggregate_Query_API/BA_Stateless_and_Stateful_Modes
+   */
+  public static final String ZUORA_DELTED_COLUMN = "zuora.deleted.column";
+  public static final String ZUORA_TIMESTAMP_COLUMNS = "zuora.timestamp.columns";
+  public static final String ZUORA_ROW_LIMIT = "zuora.row.limit";
+
+  public static final String ZUORA_API_RETRY_POST_COUNT = "zuora.api.retry.post.count";
+  public static final String ZUORA_API_RETRY_POST_WAIT_TIME = "zuora.api.retry.post.wait_time";
+  public static final String ZUORA_API_RETRY_GET_FILES_COUNT = "zuora.api.retry.get_files.count";
+  public static final String ZUORA_API_RETRY_GET_FILES_WAIT_TIME = "zuora.api.retry.get_files.wait_time";
+  public static final String ZUORA_API_RETRY_STREAM_FILES_COUNT = "zuora.api.retry.stream_files.count";
+  public static final String ZUORA_API_RETRY_STREAM_FILES_WAIT_TIME = "zuora.api.retry.stream_files.wait_time";
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraDeletedColumn.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraDeletedColumn.java
new file mode 100644
index 0000000..9483123
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraDeletedColumn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.Serializable;
+import org.apache.gobblin.annotation.Alpha;
+
+
+@Alpha
+public class ZuoraDeletedColumn implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  String column;
+
+  ZuoraDeletedColumn(String columnName) {
+    column = columnName;
+  }
+
+  public String getColumn() {
+    return column;
+  }
+
+  public void setColumn(String column) {
+    this.column = column;
+  }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraExtractor.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraExtractor.java
new file mode 100644
index 0000000..0eef4a6
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraExtractor.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.exception.HighWatermarkException;
+import org.apache.gobblin.source.extractor.exception.RecordCountException;
+import org.apache.gobblin.source.extractor.exception.SchemaException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.QueryBasedExtractor;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.schema.Schema;
+import org.apache.gobblin.source.extractor.utils.Utils;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+
+@Alpha
+@Slf4j
+public class ZuoraExtractor extends QueryBasedExtractor<JsonArray, JsonElement> {
+  private static final Gson GSON = new Gson();
+  private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
+  private static final String DATE_FORMAT = "yyyy-MM-dd";
+  private static final String HOUR_FORMAT = "HH";
+  private final ZuoraClient _client;
+  private ZuoraClientFilesStreamer _fileStreamer;
+  private List<String> _fileIds;
+  private List<String> _header;
+
+  public ZuoraExtractor(WorkUnitState workUnitState) {
+    super(workUnitState);
+    _client = new ZuoraClientImpl(workUnitState);
+  }
+
+  @Override
+  public Iterator<JsonElement> getRecordSet(String schema, String entity, WorkUnit workUnit,
+      List<Predicate> predicateList)
+      throws DataRecordException, IOException {
+    if (_fileStreamer == null || _fileStreamer.isJobFailed()) {
+      _fileStreamer = new ZuoraClientFilesStreamer(workUnitState, _client);
+    }
+
+    if (_fileIds == null) {
+      List<Command> cmds = _client.buildPostCommand(predicateList);
+      CommandOutput<RestApiCommand, String> postResponse = _client.executePostRequest(cmds.get(0));
+      String jobId = ZuoraClientImpl.getJobId(postResponse);
+      _fileIds = _client.getFileIds(jobId);
+    }
+
+    if (!_fileStreamer.isJobFinished()) {
+      return _fileStreamer.streamFiles(_fileIds, _header).iterator();
+    }
+
+    return null;
+  }
+
+  @Override
+  protected boolean isInitialPull() {
+    return _fileIds == null || _fileStreamer.isJobFailed();
+  }
+
+  @Override
+  public void extractMetadata(String schema, String entity, WorkUnit workUnit)
+      throws SchemaException, IOException {
+    String deltaFields = workUnit.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
+    String primaryKeyColumn = workUnit.getProp(ConfigurationKeys.EXTRACT_PRIMARY_KEY_FIELDS_KEY);
+    JsonArray columnArray = new JsonArray();
+    _header = new ArrayList<>();
+
+    try {
+      JsonArray array =
+          GSON.fromJson(workUnit.getProp(ConfigurationKeys.SOURCE_SCHEMA), JsonArray.class).getAsJsonArray();
+      for (JsonElement columnElement : array) {
+        Schema obj = GSON.fromJson(columnElement, Schema.class);
+        String columnName = obj.getColumnName();
+        _header.add(columnName);
+
+        boolean isWaterMarkColumn = isWatermarkColumn(deltaFields, columnName);
+        if (isWaterMarkColumn) {
+          obj.setWaterMark(true);
+          obj.setNullable(false);
+        }
+
+        int primarykeyIndex = getPrimarykeyIndex(primaryKeyColumn, columnName);
+        obj.setPrimaryKey(primarykeyIndex);
+        boolean isPrimaryKeyColumn = primarykeyIndex > 0;
+        if (isPrimaryKeyColumn) {
+          obj.setNullable(false);
+        }
+
+        String jsonStr = GSON.toJson(obj);
+        JsonObject jsonObject = GSON.fromJson(jsonStr, JsonObject.class).getAsJsonObject();
+        columnArray.add(jsonObject);
+      }
+
+      log.info("Update Schema is:" + columnArray);
+      setOutputSchema(columnArray);
+    } catch (Exception e) {
+      throw new SchemaException("Failed to get schema using rest api; error - " + e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public long getMaxWatermark(String schema, String entity, String watermarkColumn,
+      List<Predicate> snapshotPredicateList, String watermarkSourceFormat)
+      throws HighWatermarkException {
+    throw new HighWatermarkException(
+        "GetMaxWatermark with query is not supported! Please set source.querybased.skip.high.watermark.calc to true.");
+  }
+
+  @Override
+  public long getSourceCount(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList)
+      throws RecordCountException {
+    // Set source.querybased.skip.count.calc to true will set SourceCount to -1. However, ...
+
+    // This ExpectedRecordCount will determine tablesWithNoUpdatesOnPreviousRun in QueryBasedSource.
+    // We need to return a positive number to bypass this check and move Low watermark forward.
+
+    return 1;
+  }
+
+  @Override
+  public String getWatermarkSourceFormat(WatermarkType watermarkType) {
+    switch (watermarkType) {
+      case TIMESTAMP:
+        return TIMESTAMP_FORMAT;
+      case DATE:
+        return DATE_FORMAT;
+      case HOUR:
+        return HOUR_FORMAT;
+      default:
+        throw new RuntimeException("Watermark type " + watermarkType.toString() + " is not supported");
+    }
+  }
+
+  @Override
+  public String getHourPredicateCondition(String column, long value, String valueFormat, String operator) {
+    String hourPredicate = String
+        .format("%s %s '%s'", column, operator, Utils.toDateTimeFormat(Long.toString(value), valueFormat, HOUR_FORMAT));
+    log.info("Hour predicate is: " + hourPredicate);
+
+    return hourPredicate;
+  }
+
+  @Override
+  public String getDatePredicateCondition(String column, long value, String valueFormat, String operator) {
+    String datePredicate = String
+        .format("%s %s '%s'", column, operator, Utils.toDateTimeFormat(Long.toString(value), valueFormat, DATE_FORMAT));
+    log.info("Date predicate is: " + datePredicate);
+
+    return datePredicate;
+  }
+
+  @Override
+  public String getTimestampPredicateCondition(String column, long value, String valueFormat, String operator) {
+    String timeStampPredicate = String.format("%s %s '%s'", column, operator,
+        Utils.toDateTimeFormat(Long.toString(value), valueFormat, TIMESTAMP_FORMAT));
+    log.info("Timestamp predicate is: " + timeStampPredicate);
+
+    return timeStampPredicate;
+  }
+
+  @Override
+  public Map<String, String> getDataTypeMap() {
+    Map<String, String> dataTypeMap =
+        ImmutableMap.<String, String>builder().put("date", "date").put("datetime", "timestamp").put("time", "time")
+            .put("string", "string").put("int", "int").put("long", "long").put("float", "float").put("double", "double")
+            .put("decimal", "double").put("varchar", "string").put("boolean", "boolean").build();
+
+    return dataTypeMap;
+  }
+
+  List<String> extractHeader(ArrayList<String> firstLine) {
+    List<String> header = ZuoraUtil.getHeader(firstLine);
+    if (StringUtils.isBlank(workUnitState.getProp(ConfigurationKeys.SOURCE_SCHEMA))) {
+      List<String> timeStampColumns = Lists.newArrayList();
+      String timeStampColumnString = workUnitState.getProp(ZuoraConfigurationKeys.ZUORA_TIMESTAMP_COLUMNS);
+      if (StringUtils.isNotBlank(timeStampColumnString)) {
+        timeStampColumns = Arrays.asList(timeStampColumnString.toLowerCase().replaceAll(" ", "").split(","));
+      }
+      setSchema(header, timeStampColumns);
+    }
+    log.info("Record header: " + header);
+
+    return header;
+  }
+
+  private void setSchema(List<String> cols, List<String> timestampColumns) {
+    JsonArray columnArray = new JsonArray();
+    for (String columnName : cols) {
+      Schema obj = new Schema();
+      obj.setColumnName(columnName);
+      obj.setComment("resolved");
+      obj.setWaterMark(isWatermarkColumn(workUnit.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY), columnName));
+
+      if (isWatermarkColumn(workUnit.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY), columnName)) {
+        obj.setNullable(false);
+        obj.setDataType(convertDataType(columnName, "timestamp", null, null));
+      } else if (getPrimarykeyIndex(workUnit.getProp(ConfigurationKeys.EXTRACT_PRIMARY_KEY_FIELDS_KEY), columnName)
+          == 0) {
+        // set all columns as nullable except primary key and watermark columns
+        obj.setNullable(true);
+      }
+
+      if (timestampColumns != null && timestampColumns.contains(columnName.toLowerCase())) {
+        obj.setDataType(convertDataType(columnName, "timestamp", null, null));
+      }
+
+      obj.setPrimaryKey(
+          getPrimarykeyIndex(workUnit.getProp(ConfigurationKeys.EXTRACT_PRIMARY_KEY_FIELDS_KEY), columnName));
+
+      String jsonStr = GSON.toJson(obj);
+      JsonObject jsonObject = GSON.fromJson(jsonStr, JsonObject.class).getAsJsonObject();
+      columnArray.add(jsonObject);
+    }
+
+    log.info("Resolved Schema: " + columnArray);
+    this.setOutputSchema(columnArray);
+  }
+
+  @Override
+  public void closeConnection()
+      throws Exception {
+  }
+
+  @Override
+  public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit,
+      List<Predicate> predicateList)
+      throws IOException {
+    throw new RuntimeException("Not supported");
+  }
+
+  @Override
+  public void setTimeOut(int timeOut) {
+    // Ignore for now
+  }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraParams.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraParams.java
new file mode 100644
index 0000000..2d1c46d
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraParams.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+@Alpha
+public class ZuoraParams implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  String name;
+  String partner;
+  String project;
+  List<ZuoraQuery> queries;
+  String format;
+  String version;
+  String encrypted = "none";
+  String useQueryLabels = "false";
+  String dateTimeUtc = "true";
+
+  ZuoraParams(String partner, String project, List<ZuoraQuery> queries, String name, String format, String version) {
+    super();
+    this.partner = partner;
+    this.project = project;
+    this.queries = queries;
+    this.name = name;
+    this.format = format;
+    this.version = version;
+  }
+
+  public String getFormat() {
+    return format;
+  }
+
+  public void setFormat(String format) {
+    this.format = format;
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getEncrypted() {
+    return encrypted;
+  }
+
+  public void setEncrypted(String encrypted) {
+    this.encrypted = encrypted;
+  }
+
+  public String getUseQueryLabels() {
+    return useQueryLabels;
+  }
+
+  public void setUseQueryLabels(String useQueryLabels) {
+    this.useQueryLabels = useQueryLabels;
+  }
+
+  public String getPartner() {
+    return partner;
+  }
+
+  public void setPartner(String partner) {
+    this.partner = partner;
+  }
+
+  public String getProject() {
+    return project;
+  }
+
+  public void setProject(String project) {
+    this.project = project;
+  }
+
+  public String getDateTimeUtc() {
+    return dateTimeUtc;
+  }
+
+  public void setDateTimeUtc(String dateTimeUtc) {
+    this.dateTimeUtc = dateTimeUtc;
+  }
+
+  public List<ZuoraQuery> getQueries() {
+    return queries;
+  }
+
+  public void setQueries(List<ZuoraQuery> queries) {
+    this.queries = queries;
+  }
+
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraQuery.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraQuery.java
new file mode 100644
index 0000000..d703fcd
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraQuery.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.Serializable;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import com.google.common.base.Strings;
+
+
+@Alpha
+public class ZuoraQuery implements Serializable {
+  private static final long serialVersionUID = 1L;
+  public String name;
+  public String query;
+  public String type = "zoqlexport";
+  //Check the documentation here:
+  //https://knowledgecenter.zuora.com/DC_Developers/T_Aggregate_Query_API/BA_Stateless_and_Stateful_Modes
+  public ZuoraDeletedColumn deleted = null;
+
+  ZuoraQuery(String name, String query, String deleteColumn) {
+    super();
+    this.name = name;
+    this.query = query;
+    if (!Strings.isNullOrEmpty(deleteColumn)) {
+      deleted = new ZuoraDeletedColumn(deleteColumn);
+    }
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraSource.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraSource.java
new file mode 100644
index 0000000..37b63b3
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
+import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
+
+/**
+ * An implementation of Zuora source to get work units
+ */
+@Alpha
+public class ZuoraSource extends QueryBasedSource<JsonArray, JsonElement> {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryBasedSource.class);
+
+  public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state) throws IOException {
+    Extractor<JsonArray, JsonElement> extractor = null;
+    try {
+      extractor = new ZuoraExtractor(state).build();
+    } catch (ExtractPrepareException e) {
+      LOG.error("Failed to prepare extractor: error - " + e.getMessage());
+      throw new IOException(e);
+    }
+    return extractor;
+  }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraUtil.java b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraUtil.java
new file mode 100644
index 0000000..91edf5e
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/java/org/apache/gobblin/zuora/ZuoraUtil.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.zuora;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import javax.net.ssl.HttpsURLConnection;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.password.PasswordManager;
+
+import com.google.common.collect.Lists;
+
+
+@Alpha
+@Slf4j
+public class ZuoraUtil {
+
+  private ZuoraUtil() {
+  }
+
+  public static HttpsURLConnection getConnection(String urlPath, WorkUnitState workUnitState)
+      throws IOException {
+    log.info("URL: " + urlPath);
+
+    URL url = new URL(urlPath);
+    HttpsURLConnection connection;
+    String proxyUrl = workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL);
+    if (StringUtils.isNotBlank(proxyUrl)) {
+      int proxyPort = workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_CONN_USE_PROXY_PORT);
+      Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyUrl, proxyPort));
+      connection = (HttpsURLConnection) url.openConnection(proxy);
+    } else {
+      connection = (HttpsURLConnection) url.openConnection();
+    }
+
+    connection.setRequestProperty("Content-Type", "application/json");
+
+    String userName = workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME);
+    if (StringUtils.isNotBlank(userName)) {
+      String password =
+          PasswordManager.getInstance(workUnitState).readPassword(workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD));
+      String userpass = userName + ":" + password;
+      String basicAuth = "Basic " + new String(new Base64().encode(userpass.getBytes("UTF-8")), "UTF-8");
+      connection.setRequestProperty("Authorization", basicAuth);
+    }
+
+    connection.setConnectTimeout(workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_CONN_TIMEOUT, 30000));
+    return connection;
+  }
+
+  public static String getStringFromInputStream(InputStream is) {
+    BufferedReader br = null;
+    StringBuilder sb = new StringBuilder();
+    String line;
+    try {
+      br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+      while ((line = br.readLine()) != null) {
+        sb.append(line);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    return sb.toString();
+  }
+
+  public static List<String> getHeader(ArrayList<String> cols) {
+    List<String> columns = Lists.newArrayList();
+    for (String col : cols) {
+      String[] colRefs = col.split(":");
+      String columnName;
+      if (colRefs.length >= 2) {
+        columnName = colRefs[1];
+      } else {
+        columnName = colRefs[0];
+      }
+      columns.add(columnName.replaceAll(" ", "").trim());
+    }
+    return columns;
+  }
+}
diff --git a/gobblin-modules/gobblin-zuora/src/main/resources/zuora_sample.pull b/gobblin-modules/gobblin-zuora/src/main/resources/zuora_sample.pull
new file mode 100644
index 0000000..eace140
--- /dev/null
+++ b/gobblin-modules/gobblin-zuora/src/main/resources/zuora_sample.pull
@@ -0,0 +1,54 @@
+job.group=Zuora_Sample
+extract.namespace=Zuora_Sample
+source.class=org.apache.gobblin.zuora.ZuoraSource
+converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
+
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.fs.uri=file://localhost/
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+
+source.querybased.extract.type=snapshot
+source.querybased.low.watermark.backup.secs=300
+source.max.number.of.partitions=1
+source.querybased.partition.interval=1
+source.querybased.fetch.size=10000
+source.querybased.schema=Zuora
+source.querybased.watermark.type=timestamp
+source.querybased.skip.high.watermark.calc=true
+source.querybased.start.value=CURRENTDAY-3
+# source.querybased.append.max.watermark.limit=CURRENTDATE
+
+source.conn.username=<USERNAME>
+source.conn.password=<PASSWORD RAW OR ENCRYPTED>
+encrypt.key.loc=<PASSWORD KEY FILE PATH>
+source.conn.host=https://www.zuora.com/apps/api/
+source.conn.version=1.2
+source.conn.timeout=30000
+
+zuora.output.format=gzip
+zuora.api.name=<ZUORA API NAME>
+zuora.partner=<ZUORA PARTNER>
+zuora.project=<ZUORA PROJECT>
+zuora.timestamp.columns=CreatedDate,UpdatedDate
+zuora.deleted.column=IsDeleted
+
+extract.table.type=snapshot_append
+extract.delta.fields=UpdatedDate
+extract.primary.key.fields=Id
+
+converter.avro.timestamp.format=yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss
+converter.avro.date.format=yyyy-MM-dd
+converter.avro.time.format=HH:mm:ss
+converter.avro.date.timezone=America/Los_Angeles
+converter.is.epoch.time.in.seconds=true
+source.timezone=America/Los_Angeles
+workunit.retry.enabled=true
+task.retry.intervalinsec=20
+task.maxretries=10
+
+writer.include.record.count.in.file.names=true
+
+# Proxy settings
+source.conn.use.proxy.url=<PROXY URL if any>
+source.conn.use.proxy.port=<PROXY PORT if any>
\ No newline at end of file