You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by kh...@apache.org on 2017/04/20 06:03:31 UTC
zeppelin git commit: [ZEPPELIN-2318][branch-0.7] Fix proxy
configuration for http client of zeppelinhub storage layer
Repository: zeppelin
Updated Branches:
refs/heads/branch-0.7 69e70d515 -> 4dfb81500
[ZEPPELIN-2318][branch-0.7] Fix proxy configuration for http client of zeppelinhub storage layer
### What is this PR for?
this is to resolve this issue for `branch-0.7` since original PR #2198 had conflicts with the branch. for more details check the original pr
### What type of PR is it?
Bug Fix | Improvement
### Todos
* [x] - Task
### What is the Jira issue?
[ZEPPELIN-2318](https://issues.apache.org/jira/browse/ZEPPELIN-2318)
### How should this be tested?
check in #2198
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Khalid Huseynov <kh...@gmail.com>
Author: LeiWang <wa...@163.com>
Closes #2247 from khalidhuseynov/fix/branch-0.7-ZEPPELIN-2318 and squashes the following commits:
ec23c958a [Khalid Huseynov] edge case logs from error -> warn
e4db79ca0 [Khalid Huseynov] fix log
d8efb462f [Khalid Huseynov] fix websocket timing
e6622398d [Khalid Huseynov] add ssl setup
8f8109eaa [Khalid Huseynov] add close routine
7f3cd5040 [Khalid Huseynov] jetty client relay to asyncclient when proxy on
a65f73556 [Khalid Huseynov] add proxy client with asynclient library
6add668ff [Khalid Huseynov] add dependency in pom, resolve coflict
d4cacadaf [LeiWang] fix bugs for timer saver
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/4dfb8150
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/4dfb8150
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/4dfb8150
Branch: refs/heads/branch-0.7
Commit: 4dfb815004c3d5a611c3ce16c6f6050813e24f38
Parents: 69e70d5
Author: Khalid Huseynov <kh...@gmail.com>
Authored: Wed Mar 29 16:54:38 2017 +0900
Committer: Khalid Huseynov <kh...@gmail.com>
Committed: Thu Apr 20 15:04:15 2017 +0900
----------------------------------------------------------------------
pom.xml | 7 +
zeppelin-zengine/pom.xml | 5 +
.../repo/zeppelinhub/ZeppelinHubRepo.java | 3 +-
.../repo/zeppelinhub/rest/HttpProxyClient.java | 212 +++++++++++++++++++
.../rest/ZeppelinhubRestApiHandler.java | 114 +++++++---
.../zeppelinhub/websocket/ZeppelinClient.java | 17 +-
6 files changed, 320 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5010675..4df0094 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,7 @@
<jetty.version>9.2.15.v20160210</jetty.version>
<httpcomponents.core.version>4.3.3</httpcomponents.core.version>
<httpcomponents.client.version>4.3.6</httpcomponents.client.version>
+ <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version>
<commons.lang.version>2.5</commons.lang.version>
<commons.configuration.version>1.9</commons.configuration.version>
<commons.codec.version>1.5</commons.codec.version>
@@ -174,6 +175,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>${httpcomponents.asyncclient.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons.lang.version}</version>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index efe1c8c..730ada8 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -114,6 +114,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.s3.version}</version>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
index 2f33f6f..cd94180 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -217,7 +217,8 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public void close() {
- //websocketClient.stop();
+ websocketClient.stop();
+ restApiClient.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java
new file mode 100644
index 0000000..690a8b6
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier;
+import org.apache.http.conn.ssl.SSLContexts;
+import org.apache.http.conn.ssl.X509HostnameVerifier;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOReactorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is http client class for the case of proxy usage
+ * jetty-client has issue with https over proxy for 9.2.x
+ * https://github.com/eclipse/jetty.project/issues/408
+ * https://github.com/eclipse/jetty.project/issues/827
+ *
+ */
+
+public class HttpProxyClient {
+ private static final Logger LOG = LoggerFactory.getLogger(HttpProxyClient.class);
+ public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
+
+ private CloseableHttpAsyncClient client;
+ private URI proxyUri;
+
+ public static HttpProxyClient newInstance(URI proxyUri) {
+ return new HttpProxyClient(proxyUri);
+ }
+
+ private HttpProxyClient(URI uri) {
+ this.proxyUri = uri;
+
+ client = getAsyncProxyHttpClient(proxyUri);
+ client.start();
+ }
+
+ public URI getProxyUri() {
+ return proxyUri;
+ }
+
+ private CloseableHttpAsyncClient getAsyncProxyHttpClient(URI proxyUri) {
+ LOG.info("Creating async proxy http client");
+ PoolingNHttpClientConnectionManager cm = getAsyncConnectionManager();
+ HttpHost proxy = new HttpHost(proxyUri.getHost(), proxyUri.getPort());
+
+ HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
+ if (cm != null) {
+ clientBuilder = clientBuilder.setConnectionManager(cm);
+ }
+
+ if (proxy != null) {
+ clientBuilder = clientBuilder.setProxy(proxy);
+ }
+ clientBuilder = setRedirects(clientBuilder);
+ return clientBuilder.build();
+ }
+
+ private PoolingNHttpClientConnectionManager getAsyncConnectionManager() {
+ ConnectingIOReactor ioReactor = null;
+ PoolingNHttpClientConnectionManager cm = null;
+ try {
+ ioReactor = new DefaultConnectingIOReactor();
+ // ssl setup
+ SSLContext sslcontext = SSLContexts.createSystemDefault();
+ X509HostnameVerifier hostnameVerifier = new BrowserCompatHostnameVerifier();
+ @SuppressWarnings("deprecation")
+ Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder
+ .<SchemeIOSessionStrategy>create()
+ .register("http", NoopIOSessionStrategy.INSTANCE)
+ .register("https", new SSLIOSessionStrategy(sslcontext, hostnameVerifier))
+ .build();
+
+ cm = new PoolingNHttpClientConnectionManager(ioReactor, sessionStrategyRegistry);
+ } catch (IOReactorException e) {
+ LOG.error("Couldn't initialize multi-threaded async client ", e);
+ return null;
+ }
+ return cm;
+ }
+
+ private HttpAsyncClientBuilder setRedirects(HttpAsyncClientBuilder clientBuilder) {
+ clientBuilder.setRedirectStrategy(new DefaultRedirectStrategy() {
+ /** Redirectable methods. */
+ private String[] REDIRECT_METHODS = new String[] {
+ HttpGet.METHOD_NAME, HttpPost.METHOD_NAME,
+ HttpPut.METHOD_NAME, HttpDelete.METHOD_NAME, HttpHead.METHOD_NAME
+ };
+
+ @Override
+ protected boolean isRedirectable(String method) {
+ for (String m : REDIRECT_METHODS) {
+ if (m.equalsIgnoreCase(method)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ return clientBuilder;
+ }
+
+ public String sendToZeppelinHub(HttpRequestBase request,
+ boolean withResponse) throws IOException {
+ return withResponse ?
+ sendAndGetResponse(request) : sendWithoutResponseBody(request);
+ }
+
+
+ private String sendWithoutResponseBody(HttpRequestBase request) throws IOException {
+ FutureCallback<HttpResponse> callback = getCallback(request);
+ client.execute(request, callback);
+ return StringUtils.EMPTY;
+ }
+
+ private String sendAndGetResponse(HttpRequestBase request) throws IOException {
+ String data = StringUtils.EMPTY;
+ try {
+ HttpResponse response = client.execute(request, null).get(30, TimeUnit.SECONDS);
+ int code = response.getStatusLine().getStatusCode();
+ if (code == 200) {
+ try (InputStream responseContent = response.getEntity().getContent()) {
+ data = IOUtils.toString(responseContent, "UTF-8");
+ }
+ } else {
+ LOG.error("ZeppelinHub {} {} returned with status {} ", request.getMethod(),
+ request.getURI(), code);
+ throw new IOException("Cannot perform " + request.getMethod() + " request to ZeppelinHub");
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException
+ | NullPointerException e) {
+ throw new IOException(e);
+ }
+ return data;
+ }
+
+ private FutureCallback<HttpResponse> getCallback(final HttpRequestBase request) {
+ return new FutureCallback<HttpResponse>() {
+
+ public void completed(final HttpResponse response) {
+ request.releaseConnection();
+ LOG.info("Note {} completed with {} status", request.getMethod(),
+ response.getStatusLine());
+ }
+
+ public void failed(final Exception ex) {
+ request.releaseConnection();
+ LOG.error("Note {} failed with {} message", request.getMethod(),
+ ex.getMessage());
+ }
+
+ public void cancelled() {
+ request.releaseConnection();
+ LOG.info("Note {} was canceled", request.getMethod());
+ }
+ };
+ }
+
+ public void stop() {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close proxy client ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
index f2ae7b9..437386c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -27,6 +29,12 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
@@ -52,11 +60,10 @@ public class ZeppelinhubRestApiHandler {
private static final String USER_SESSION_HEADER = "X-User-Session";
private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
private static boolean PROXY_ON = false;
- private static String PROXY_HOST;
- private static int PROXY_PORT;
-
+ //TODO(xxx): possibly switch to jetty-client > 9.3.12 when adopt jvm 1.8
+ private static HttpProxyClient proxyClient;
private final HttpClient client;
- private final String zepelinhubUrl;
+ private String zepelinhubUrl;
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl) {
return new ZeppelinhubRestApiHandler(zeppelinhubUrl);
@@ -65,8 +72,7 @@ public class ZeppelinhubRestApiHandler {
private ZeppelinhubRestApiHandler(String zeppelinhubUrl) {
this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
- //TODO(khalid):to make proxy conf consistent with Zeppelin confs
- //readProxyConf();
+ readProxyConf();
client = getAsyncClient();
try {
@@ -74,48 +80,41 @@ public class ZeppelinhubRestApiHandler {
} catch (Exception e) {
LOG.error("Cannot initialize ZeppelinHub REST async client", e);
}
-
}
-
+
private void readProxyConf() {
- //try reading http_proxy
- String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
- System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
+ //try reading https_proxy
+ String proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
+ System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
if (StringUtils.isBlank(proxyHostString)) {
- //try https_proxy if no http_proxy
- proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
- System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
+ //try http_proxy if no https_proxy
+ proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
+ System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
}
- if (StringUtils.isBlank(proxyHostString)) {
- PROXY_ON = false;
- } else {
- // host format - http://domain:port/
- String[] parts = proxyHostString.replaceAll("/", "").split(":");
- if (parts.length != 3) {
- LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString);
- PROXY_ON = false;
- return;
+ if (!StringUtils.isBlank(proxyHostString)) {
+ URI uri = null;
+ try {
+ uri = new URI(proxyHostString);
+ } catch (URISyntaxException e) {
+ LOG.warn("Proxy uri doesn't follow correct syntax", e);
+ }
+ if (uri != null) {
+ PROXY_ON = true;
+ proxyClient = HttpProxyClient.newInstance(uri);
}
- PROXY_HOST = parts[1];
- PROXY_PORT = Integer.parseInt(parts[2]);
- LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]);
- PROXY_ON = true;
}
}
private HttpClient getAsyncClient() {
SslContextFactory sslContextFactory = new SslContextFactory();
HttpClient httpClient = new HttpClient(sslContextFactory);
-
// Configure HttpClient
httpClient.setFollowRedirects(false);
httpClient.setMaxConnectionsPerDestination(100);
+
// Config considerations
- //TODO(khalid): consider using proxy
- //TODO(khalid): consider whether require to follow redirects
//TODO(khalid): consider multi-threaded connection manager case
-
return httpClient;
}
@@ -159,7 +158,11 @@ public class ZeppelinhubRestApiHandler {
return StringUtils.EMPTY;
}
String url = zepelinhubUrl + argument;
- return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
+ if (PROXY_ON) {
+ return sendToZeppelinHubViaProxy(new HttpGet(url), StringUtils.EMPTY, token, true);
+ } else {
+ return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
+ }
}
public String putWithResponseBody(String token, String url, String json) throws IOException {
@@ -167,7 +170,11 @@ public class ZeppelinhubRestApiHandler {
LOG.error("Empty note, cannot send it to zeppelinHub");
throw new IOException("Cannot send emtpy note to zeppelinHub");
}
- return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
+ if (PROXY_ON) {
+ return sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl + url), json, token, true);
+ } else {
+ return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
+ }
}
public void put(String token, String jsonNote) throws IOException {
@@ -175,7 +182,11 @@ public class ZeppelinhubRestApiHandler {
LOG.error("Cannot save empty note/string to ZeppelinHub");
return;
}
- sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
+ if (PROXY_ON) {
+ sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl), jsonNote, token, false);
+ } else {
+ sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
+ }
}
public void del(String token, String argument) throws IOException {
@@ -183,7 +194,37 @@ public class ZeppelinhubRestApiHandler {
LOG.error("Cannot delete empty note from ZeppelinHub");
return;
}
- sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false);
+ if (PROXY_ON) {
+ sendToZeppelinHubViaProxy(new HttpDelete(zepelinhubUrl + argument), StringUtils.EMPTY, token,
+ false);
+ } else {
+ sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token,
+ false);
+ }
+ }
+
+ private String sendToZeppelinHubViaProxy(HttpRequestBase request,
+ String json,
+ String token,
+ boolean withResponse) throws IOException {
+ request.setHeader(ZEPPELIN_TOKEN_HEADER, token);
+ if (request.getMethod().equals(HttpPost.METHOD_NAME)) {
+ HttpPost post = (HttpPost) request;
+ StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
+ post.setEntity(content);
+ }
+ if (request.getMethod().equals(HttpPut.METHOD_NAME)) {
+ HttpPut put = (HttpPut) request;
+ StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
+ put.setEntity(content);
+ }
+ String body = StringUtils.EMPTY;
+ if (proxyClient != null) {
+ body = proxyClient.sendToZeppelinHub(request, withResponse);
+ } else {
+ LOG.warn("Proxy client request was submitted while not correctly initialized");
+ }
+ return body;
}
private String sendToZeppelinHub(HttpMethod method,
@@ -243,6 +284,9 @@ public class ZeppelinhubRestApiHandler {
public void close() {
try {
client.stop();
+ if (proxyClient != null) {
+ proxyClient.stop();
+ }
} catch (Exception e) {
LOG.info("Couldn't stop ZeppelinHub client properly", e);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
index 9847e1c..b072251 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
@@ -137,9 +137,22 @@ public class ZeppelinClient {
new Timer().schedule(new java.util.TimerTask() {
@Override
public void run() {
- watcherSession = openWatcherSession();
+ int time = 0;
+ while (time < 5 * MIN) {
+ watcherSession = openWatcherSession();
+ if (watcherSession == null) {
+ try {
+ Thread.sleep(5000);
+ time += 5;
+ } catch (InterruptedException e) {
+ //continue
+ }
+ } else {
+ break;
+ }
+ }
}
- }, 10000);
+ }, 5000);
}
public void stop() {