You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/14 00:53:03 UTC
incubator-gobblin git commit: [GOBBLIN-466] Reuse the same connector
for salesforce dynamic partitioning
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 12bb1dcf9 -> c36f2c87b
[GOBBLIN-466] Reuse the same connector for salesforce dynamic partitioning
Closes #2338 from yukuai518/sf2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c36f2c87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c36f2c87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c36f2c87
Branch: refs/heads/master
Commit: c36f2c87b754032d0946df0e58514c6932e33a9c
Parents: 12bb1dc
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Apr 13 17:52:56 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Apr 13 17:52:56 2018 -0700
----------------------------------------------------------------------
.../org/apache/gobblin/salesforce/SalesforceSource.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c36f2c87/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 3702c2e..aedd97b 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -109,6 +109,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
private static final Gson GSON = new Gson();
private boolean isEarlyStopped = false;
+ protected SalesforceConnector salesforceConnector = null;
public SalesforceSource() {
this.lineageInfo = Optional.absent();
@@ -485,12 +486,19 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
return histogram;
}
+ protected SalesforceConnector getConnector(State state) {
+ if (this.salesforceConnector == null) {
+ this.salesforceConnector = new SalesforceConnector(state);
+ }
+ return this.salesforceConnector;
+ }
+
/**
* Generate the histogram
*/
private Histogram getHistogram(String entity, String watermarkColumn, SourceState state,
Partition partition) {
- SalesforceConnector connector = new SalesforceConnector(state);
+ SalesforceConnector connector = getConnector(state);
try {
if (!connector.connect()) {
@@ -595,7 +603,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
return super.getSourceEntities(state);
}
- SalesforceConnector connector = new SalesforceConnector(state);
+ SalesforceConnector connector = getConnector(state);
try {
if (!connector.connect()) {
throw new RuntimeException("Failed to connect.");