You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 06:50:02 UTC
[09/11] zeppelin git commit: Revert "[ZEPPELIN-2627] Interpreter
refactor"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
deleted file mode 100644
index bb90dd8..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.thrift.TException;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Proxy for Interpreter instance that runs on separate process
- */
-public class RemoteInterpreter extends Interpreter {
- private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class);
- private static final Gson gson = new Gson();
-
-
- private String className;
- private String sessionId;
- private String userName;
- private FormType formType;
-
- private RemoteInterpreterProcess interpreterProcess;
- private volatile boolean isOpened = false;
- private volatile boolean isCreated = false;
-
- /**
- * Remote interpreter and manage interpreter process
- */
- public RemoteInterpreter(Properties properties,
- String sessionId,
- String className,
- String userName) {
- super(properties);
- this.sessionId = sessionId;
- this.className = className;
- this.userName = userName;
- }
-
- public boolean isOpened() {
- return isOpened;
- }
-
- @Override
- public String getClassName() {
- return className;
- }
-
- public String getSessionId() {
- return this.sessionId;
- }
-
- public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
- if (this.interpreterProcess != null) {
- return this.interpreterProcess;
- }
- InterpreterGroup intpGroup = getInterpreterGroup();
- this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess();
- synchronized (interpreterProcess) {
- if (!interpreterProcess.isRunning()) {
- interpreterProcess.start(userName, false);
- interpreterProcess.getRemoteInterpreterEventPoller()
- .setInterpreterProcess(interpreterProcess);
- interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
- interpreterProcess.getRemoteInterpreterEventPoller().start();
- }
- }
- return interpreterProcess;
- }
-
- @Override
- public void open() {
- synchronized (this) {
- if (!isOpened) {
- // create all the interpreters of the same session first, then Open the internal interpreter
- // of this RemoteInterpreter.
- // The why we we create all the interpreter of the session is because some interpreter
- // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter.
- // also see method Interpreter.getInterpreterInTheSameSessionByClassName
- for (Interpreter interpreter : getInterpreterGroup().getOrCreateSession(
- userName, sessionId)) {
- ((RemoteInterpreter) interpreter).internal_create();
- }
-
- interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- LOGGER.info("Open RemoteInterpreter {}", getClassName());
- client.open(sessionId, className);
- // Push angular object loaded from JSON file to remote interpreter
- synchronized (getInterpreterGroup()) {
- if (!getInterpreterGroup().isAngularRegistryPushed()) {
- pushAngularObjectRegistryToRemote(client);
- getInterpreterGroup().setAngularRegistryPushed(true);
- }
- }
- return null;
- }
- });
- isOpened = true;
- }
- }
- }
-
- private void internal_create() {
- synchronized (this) {
- if (!isCreated) {
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- LOGGER.info("Create RemoteInterpreter {}", getClassName());
- client.createInterpreter(getInterpreterGroup().getId(), sessionId,
- className, (Map) property, userName);
- return null;
- }
- });
- isCreated = true;
- }
- }
- }
-
-
- @Override
- public void close() {
- if (isOpened) {
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- client.close(sessionId, className);
- return null;
- }
- });
- } else {
- LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
- }
- }
-
- @Override
- public InterpreterResult interpret(final String st, final InterpreterContext context) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("st:\n{}", st);
- }
-
- final FormType form = getFormType();
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
- .getInterpreterContextRunnerPool();
- List<InterpreterContextRunner> runners = context.getRunners();
- if (runners != null && runners.size() != 0) {
- // assume all runners in this InterpreterContext have the same note id
- String noteId = runners.get(0).getNoteId();
-
- interpreterContextRunnerPool.clear(noteId);
- interpreterContextRunnerPool.addAll(noteId, runners);
- }
- return interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
- @Override
- public InterpreterResult call(Client client) throws Exception {
-
- RemoteInterpreterResult remoteResult = client.interpret(
- sessionId, className, st, convert(context));
- Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
- remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
- }.getType());
- context.getConfig().clear();
- context.getConfig().putAll(remoteConfig);
- GUI currentGUI = context.getGui();
- if (form == FormType.NATIVE) {
- GUI remoteGui = GUI.fromJson(remoteResult.getGui());
- currentGUI.clear();
- currentGUI.setParams(remoteGui.getParams());
- currentGUI.setForms(remoteGui.getForms());
- } else if (form == FormType.SIMPLE) {
- final Map<String, Input> currentForms = currentGUI.getForms();
- final Map<String, Object> currentParams = currentGUI.getParams();
- final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
- final Map<String, Input> remoteForms = remoteGUI.getForms();
- final Map<String, Object> remoteParams = remoteGUI.getParams();
- currentForms.putAll(remoteForms);
- currentParams.putAll(remoteParams);
- }
-
- InterpreterResult result = convert(remoteResult);
- return result;
- }
- }
- );
-
- }
-
- @Override
- public void cancel(final InterpreterContext context) {
- if (!isOpened) {
- LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
- return;
- }
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- client.cancel(sessionId, className, convert(context));
- return null;
- }
- });
- }
-
- @Override
- public FormType getFormType() {
- if (formType != null) {
- return formType;
- }
-
- // it is possible to call getFormType before it is opened
- synchronized (this) {
- if (!isOpened) {
- open();
- }
- }
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- FormType type = interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<FormType>() {
- @Override
- public FormType call(Client client) throws Exception {
- formType = FormType.valueOf(client.getFormType(sessionId, className));
- return formType;
- }
- });
- return type;
- }
-
- @Override
- public int getProgress(final InterpreterContext context) {
- if (!isOpened) {
- LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
- return 0;
- }
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- return interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<Integer>() {
- @Override
- public Integer call(Client client) throws Exception {
- return client.getProgress(sessionId, className, convert(context));
- }
- });
- }
-
-
- @Override
- public List<InterpreterCompletion> completion(final String buf, final int cursor,
- final InterpreterContext interpreterContext) {
- if (!isOpened) {
- LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
- return new ArrayList<>();
- }
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- return interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
- @Override
- public List<InterpreterCompletion> call(Client client) throws Exception {
- return client.completion(sessionId, className, buf, cursor,
- convert(interpreterContext));
- }
- });
- }
-
- public String getStatus(final String jobId) {
- if (!isOpened) {
- LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
- return Job.Status.UNKNOWN.name();
- }
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
- return interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<String>() {
- @Override
- public String call(Client client) throws Exception {
- return client.getStatus(sessionId, jobId);
- }
- });
- }
-
- //TODO(zjffdu) Share the Scheduler in the same session or in the same InterpreterGroup ?
- @Override
- public Scheduler getScheduler() {
- int maxConcurrency = Integer.parseInt(
- property.getProperty("zeppelin.interpreter.max.poolsize",
- ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + ""));
- return SchedulerFactory.singleton().createOrGetRemoteScheduler(
- RemoteInterpreter.class.getName() + "-" + sessionId,
- sessionId, this, maxConcurrency);
- }
-
- private RemoteInterpreterContext convert(InterpreterContext ic) {
- return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
- ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
- gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
- }
-
- private InterpreterResult convert(RemoteInterpreterResult result) {
- InterpreterResult r = new InterpreterResult(
- InterpreterResult.Code.valueOf(result.getCode()));
-
- for (RemoteInterpreterResultMessage m : result.getMsg()) {
- r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
- }
-
- return r;
- }
-
- /**
- * Push local angular object registry to
- * remote interpreter. This method should be
- * call ONLY once when the first Interpreter is created
- */
- private void pushAngularObjectRegistryToRemote(Client client) throws TException {
- final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
- .getAngularObjectRegistry();
- if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
- final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
- .getRegistry();
- LOGGER.info("Push local angular object registry from ZeppelinServer to" +
- " remote interpreter group {}", this.getInterpreterGroup().getId());
- final java.lang.reflect.Type registryType = new TypeToken<Map<String,
- Map<String, AngularObject>>>() {
- }.getType();
- client.angularRegistryPush(gson.toJson(registry, registryType));
- }
- }
-
- @Override
- public String toString() {
- return "RemoteInterpreter_" + className + "_" + sessionId;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index f3bce2f..26c9d79 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -29,7 +29,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.Resource;
@@ -39,7 +38,6 @@ import org.apache.zeppelin.resource.ResourceSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
@@ -86,6 +84,7 @@ public class RemoteInterpreterEventPoller extends Thread {
@Override
public void run() {
+ Client client = null;
AppendOutputRunner runner = new AppendOutputRunner(listener);
ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
@@ -101,14 +100,26 @@ public class RemoteInterpreterEventPoller extends Thread {
continue;
}
- RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() {
- @Override
- public RemoteInterpreterEvent call(Client client) throws Exception {
- return client.getEvent();
- }
- }
- );
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ logger.error("Can't get RemoteInterpreterEvent", e1);
+ waitQuietly();
+ continue;
+ }
+
+ RemoteInterpreterEvent event = null;
+ boolean broken = false;
+ try {
+ event = client.getEvent();
+ } catch (TException e) {
+ broken = true;
+ logger.error("Can't get RemoteInterpreterEvent", e);
+ waitQuietly();
+ continue;
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
@@ -275,7 +286,10 @@ public class RemoteInterpreterEventPoller extends Thread {
boolean broken = false;
final Gson gson = new Gson();
final String eventOwnerKey = reqResourceBody.getOwnerKey();
+ Client interpreterServerMain = null;
try {
+ interpreterServerMain = interpreterProcess.getClient();
+ final Client eventClient = interpreterServerMain;
if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
@@ -294,6 +308,7 @@ public class RemoteInterpreterEventPoller extends Thread {
@Override
public void onFinished(Object resultObject) {
+ boolean clientBroken = false;
if (resultObject != null && resultObject instanceof List) {
List<InterpreterContextRunner> runnerList =
(List<InterpreterContextRunner>) resultObject;
@@ -309,15 +324,15 @@ public class RemoteInterpreterEventPoller extends Thread {
resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
resResource.setData(remoteRunners);
- interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- client.onReceivedZeppelinResource(resResource.toJson());
- return null;
- }
- }
- );
+ try {
+ eventClient.onReceivedZeppelinResource(resResource.toJson());
+ } catch (Exception e) {
+ clientBroken = true;
+ logger.error("Can't get RemoteInterpreterEvent", e);
+ waitQuietly();
+ } finally {
+ interpreterProcess.releaseClient(eventClient, clientBroken);
+ }
}
}
@@ -331,32 +346,39 @@ public class RemoteInterpreterEventPoller extends Thread {
reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
}
} catch (Exception e) {
+ broken = true;
logger.error("Can't get RemoteInterpreterEvent", e);
waitQuietly();
+ } finally {
+ interpreterProcess.releaseClient(interpreterServerMain, broken);
}
}
- private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) {
- interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- List<String> resourceList = new LinkedList<>();
- for (Resource r : resourceSet) {
- resourceList.add(r.toJson());
- }
- client.resourcePoolResponseGetAll(resourceList);
- return null;
- }
- }
- );
+ private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = interpreterProcess.getClient();
+ List<String> resourceList = new LinkedList<>();
+ Gson gson = new Gson();
+ for (Resource r : resourceSet) {
+ resourceList.add(gson.toJson(r));
+ }
+ client.resourcePoolResponseGetAll(resourceList);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
}
private ResourceSet getAllResourcePoolExcept() {
ResourceSet resourceSet = new ResourceSet();
- for (InterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting()
- .getInterpreterSettingManager().getAllInterpreterGroup()) {
+ for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
if (intpGroup.getId().equals(interpreterGroup.getId())) {
continue;
}
@@ -368,94 +390,115 @@ public class RemoteInterpreterEventPoller extends Thread {
resourceSet.addAll(localPool.getAll());
}
} else if (interpreterProcess.isRunning()) {
- List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
- @Override
- public List<String> call(Client client) throws Exception {
- return client.resourcePoolGetAll();
- }
- }
- );
- for (String res : resourceList) {
- resourceSet.add(Resource.fromJson(res));
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = remoteInterpreterProcess.getClient();
+ List<String> resourceList = client.resourcePoolGetAll();
+ Gson gson = new Gson();
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+ }
}
}
}
return resourceSet;
}
- private void sendResourceResponseGet(final ResourceId resourceId, final Object o) {
- interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- String rid = resourceId.toJson();
- ByteBuffer obj;
- if (o == null) {
- obj = ByteBuffer.allocate(0);
- } else {
- obj = Resource.serializeObject(o);
- }
- client.resourceResponseGet(rid, obj);
- return null;
- }
- }
- );
+ private void sendResourceResponseGet(ResourceId resourceId, Object o) {
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = interpreterProcess.getClient();
+ Gson gson = new Gson();
+ String rid = gson.toJson(resourceId);
+ ByteBuffer obj;
+ if (o == null) {
+ obj = ByteBuffer.allocate(0);
+ } else {
+ obj = Resource.serializeObject(o);
+ }
+ client.resourceResponseGet(rid, obj);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
}
- private Object getResource(final ResourceId resourceId) {
- InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
- .getInterpreterSettingManager()
- .getInterpreterGroupById(resourceId.getResourcePoolId());
+ private Object getResource(ResourceId resourceId) {
+ InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
+ resourceId.getResourcePoolId());
if (intpGroup == null) {
return null;
}
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
- ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
- @Override
- public ByteBuffer call(Client client) throws Exception {
- return client.resourceGet(
- resourceId.getNoteId(),
- resourceId.getParagraphId(),
- resourceId.getName());
- }
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ return localPool.get(resourceId.getName());
+ }
+ } else if (interpreterProcess.isRunning()) {
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = remoteInterpreterProcess.getClient();
+ ByteBuffer res = client.resourceGet(
+ resourceId.getNoteId(),
+ resourceId.getParagraphId(),
+ resourceId.getName());
+ Object o = Resource.deserializeObject(res);
+ return o;
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
}
- );
+ }
+ }
+ return null;
+ }
+ public void sendInvokeMethodResult(InvokeResourceMethodEventMessage message, Object o) {
+ Client client = null;
+ boolean broken = false;
try {
- Object o = Resource.deserializeObject(buffer);
- return o;
+ client = interpreterProcess.getClient();
+ Gson gson = new Gson();
+ String invokeMessage = gson.toJson(message);
+ ByteBuffer obj;
+ if (o == null) {
+ obj = ByteBuffer.allocate(0);
+ } else {
+ obj = Resource.serializeObject(o);
+ }
+ client.resourceResponseInvokeMethod(invokeMessage, obj);
} catch (Exception e) {
logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ interpreterProcess.releaseClient(client, broken);
+ }
}
- return null;
}
- public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message,
- final Object o) {
- interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<Void>() {
- @Override
- public Void call(Client client) throws Exception {
- String invokeMessage = message.toJson();
- ByteBuffer obj;
- if (o == null) {
- obj = ByteBuffer.allocate(0);
- } else {
- obj = Resource.serializeObject(o);
- }
- client.resourceResponseInvokeMethod(invokeMessage, obj);
- return null;
- }
- }
- );
- }
-
- private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) {
- final ResourceId resourceId = message.resourceId;
- InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
- .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId());
+ private Object invokeResourceMethod(InvokeResourceMethodEventMessage message) {
+ ResourceId resourceId = message.resourceId;
+ InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
+ resourceId.getResourcePoolId());
if (intpGroup == null) {
return null;
}
@@ -486,25 +529,25 @@ public class RemoteInterpreterEventPoller extends Thread {
return null;
}
} else if (interpreterProcess.isRunning()) {
- ByteBuffer res = interpreterProcess.callRemoteFunction(
- new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
- @Override
- public ByteBuffer call(Client client) throws Exception {
- return client.resourceInvokeMethod(
- resourceId.getNoteId(),
- resourceId.getParagraphId(),
- resourceId.getName(),
- message.toJson());
- }
- }
- );
-
+ Client client = null;
+ boolean broken = false;
try {
- return Resource.deserializeObject(res);
+ client = remoteInterpreterProcess.getClient();
+ ByteBuffer res = client.resourceInvokeMethod(
+ resourceId.getNoteId(),
+ resourceId.getParagraphId(),
+ resourceId.getName(),
+ gson.toJson(message));
+ Object o = Resource.deserializeObject(res);
+ return o;
} catch (Exception e) {
logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+ }
}
- return null;
}
return null;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
deleted file mode 100644
index 19356fb..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
- implements ExecuteResultHandler {
- private static final Logger logger = LoggerFactory.getLogger(
- RemoteInterpreterManagedProcess.class);
- private final String interpreterRunner;
-
- private DefaultExecutor executor;
- private ExecuteWatchdog watchdog;
- boolean running = false;
- private int port = -1;
- private final String interpreterDir;
- private final String localRepoDir;
- private final String interpreterGroupName;
-
- private Map<String, String> env;
-
- public RemoteInterpreterManagedProcess(
- String intpRunner,
- String intpDir,
- String localRepoDir,
- Map<String, String> env,
- int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
- String interpreterGroupName) {
- super(new RemoteInterpreterEventPoller(listener, appListener),
- connectTimeout);
- this.interpreterRunner = intpRunner;
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterGroupName = interpreterGroupName;
- }
-
- RemoteInterpreterManagedProcess(String intpRunner,
- String intpDir,
- String localRepoDir,
- Map<String, String> env,
- RemoteInterpreterEventPoller remoteInterpreterEventPoller,
- int connectTimeout,
- String interpreterGroupName) {
- super(remoteInterpreterEventPoller,
- connectTimeout);
- this.interpreterRunner = intpRunner;
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterGroupName = interpreterGroupName;
- }
-
- @Override
- public String getHost() {
- return "localhost";
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public void start(String userName, Boolean isUserImpersonate) {
- // start server process
- try {
- port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
- logger.info("Choose port {} for RemoteInterpreterProcess", port);
- } catch (IOException e1) {
- throw new InterpreterException(e1);
- }
-
- CommandLine cmdLine = CommandLine.parse(interpreterRunner);
- cmdLine.addArgument("-d", false);
- cmdLine.addArgument(interpreterDir, false);
- cmdLine.addArgument("-p", false);
- cmdLine.addArgument(Integer.toString(port), false);
- if (isUserImpersonate && !userName.equals("anonymous")) {
- cmdLine.addArgument("-u", false);
- cmdLine.addArgument(userName, false);
- }
- cmdLine.addArgument("-l", false);
- cmdLine.addArgument(localRepoDir, false);
- cmdLine.addArgument("-g", false);
- cmdLine.addArgument(interpreterGroupName, false);
-
- executor = new DefaultExecutor();
-
- ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
- ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
- processOutput.setOutputStream(cmdOut);
-
- executor.setStreamHandler(new PumpStreamHandler(processOutput));
- watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
- executor.setWatchdog(watchdog);
-
- try {
- Map procEnv = EnvironmentUtils.getProcEnvironment();
- procEnv.putAll(env);
-
- logger.info("Run interpreter process {}", cmdLine);
- executor.execute(cmdLine, procEnv, this);
- running = true;
- } catch (IOException e) {
- running = false;
- throw new InterpreterException(e);
- }
-
-
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
- if (!running) {
- try {
- cmdOut.flush();
- } catch (IOException e) {
- // nothing to do
- }
- throw new InterpreterException(new String(cmdOut.toByteArray()));
- }
-
- try {
- if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
- break;
- } else {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
- "Thread.sleep", e);
- }
- }
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Remote interpreter not yet accessible at localhost:" + port);
- }
- }
- }
- processOutput.setOutputStream(null);
- }
-
- public void stop() {
- if (isRunning()) {
- logger.info("kill interpreter process");
- try {
- callRemoteFunction(new RemoteFunction<Void>() {
- @Override
- public Void call(RemoteInterpreterService.Client client) throws Exception {
- client.shutdown();
- return null;
- }
- });
- } catch (Exception e) {
- logger.warn("ignore the exception when shutting down");
- }
- watchdog.destroyProcess();
- }
-
- executor = null;
- watchdog = null;
- running = false;
- logger.info("Remote process terminated");
- }
-
- @Override
- public void onProcessComplete(int exitValue) {
- logger.info("Interpreter process exited {}", exitValue);
- running = false;
-
- }
-
- @Override
- public void onProcessFailed(ExecuteException e) {
- logger.info("Interpreter process failed {}", e);
- running = false;
- }
-
- public boolean isRunning() {
- return running;
- }
-
- private static class ProcessLogOutputStream extends LogOutputStream {
-
- private Logger logger;
- OutputStream out;
-
- public ProcessLogOutputStream(Logger logger) {
- this.logger = logger;
- }
-
- @Override
- protected void processLine(String s, int i) {
- this.logger.debug(s);
- }
-
- @Override
- public void write(byte [] b) throws IOException {
- super.write(b);
-
- if (out != null) {
- synchronized (this) {
- if (out != null) {
- out.write(b);
- }
- }
- }
- }
-
- @Override
- public void write(byte [] b, int offset, int len) throws IOException {
- super.write(b, offset, len);
-
- if (out != null) {
- synchronized (this) {
- if (out != null) {
- out.write(b, offset, len);
- }
- }
- }
- }
-
- public void setOutputStream(OutputStream out) {
- synchronized (this) {
- this.out = out;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index a78088c..1d48a1e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -20,13 +20,10 @@ import com.google.gson.Gson;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -35,6 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class RemoteInterpreterProcess {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
+ // number of sessions that are attached to this process
+ private final AtomicInteger referenceCount;
+
private GenericObjectPool<Client> clientPool;
private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
@@ -46,20 +46,16 @@ public abstract class RemoteInterpreterProcess {
ApplicationEventListener appListener) {
this(new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
- this.remoteInterpreterEventPoller.setInterpreterProcess(this);
}
RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
+ referenceCount = new AtomicInteger(0);
this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
this.connectTimeout = connectTimeout;
}
- public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() {
- return remoteInterpreterEventPoller;
- }
-
public abstract String getHost();
public abstract int getPort();
public abstract void start(String userName, Boolean isUserImpersonate);
@@ -70,18 +66,37 @@ public abstract class RemoteInterpreterProcess {
return connectTimeout;
}
- public synchronized Client getClient() throws Exception {
+ public int reference(InterpreterGroup interpreterGroup, String userName,
+ Boolean isUserImpersonate) {
+ synchronized (referenceCount) {
+ if (!isRunning()) {
+ start(userName, isUserImpersonate);
+ }
+
+ if (clientPool == null) {
+ clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
+ clientPool.setTestOnBorrow(true);
+
+ remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
+ remoteInterpreterEventPoller.setInterpreterProcess(this);
+ remoteInterpreterEventPoller.start();
+ }
+ return referenceCount.incrementAndGet();
+ }
+ }
+
+ public Client getClient() throws Exception {
if (clientPool == null || clientPool.isClosed()) {
- clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
+ return null;
}
return clientPool.borrowObject();
}
- private void releaseClient(Client client) {
+ public void releaseClient(Client client) {
releaseClient(client, false);
}
- private void releaseClient(Client client, boolean broken) {
+ public void releaseClient(Client client, boolean broken) {
if (broken) {
releaseBrokenClient(client);
} else {
@@ -93,7 +108,7 @@ public abstract class RemoteInterpreterProcess {
}
}
- private void releaseBrokenClient(Client client) {
+ public void releaseBrokenClient(Client client) {
try {
clientPool.invalidateObject(client);
} catch (Exception e) {
@@ -101,6 +116,90 @@ public abstract class RemoteInterpreterProcess {
}
}
+ public int dereference() {
+ synchronized (referenceCount) {
+ int r = referenceCount.decrementAndGet();
+ if (r == 0) {
+ logger.info("shutdown interpreter process");
+ remoteInterpreterEventPoller.shutdown();
+
+ // first try shutdown
+ Client client = null;
+ try {
+ client = getClient();
+ client.shutdown();
+ } catch (Exception e) {
+ // safely ignore exception while client.shutdown() may terminates remote process
+ logger.info("Exception in RemoteInterpreterProcess while synchronized dereference, can " +
+ "safely ignore exception while client.shutdown() may terminates remote process");
+ logger.debug(e.getMessage(), e);
+ } finally {
+ if (client != null) {
+ // no longer used
+ releaseBrokenClient(client);
+ }
+ }
+
+ clientPool.clear();
+ clientPool.close();
+
+ // wait for some time (connectTimeout) and force kill
+ // remote process server.serve() loop is not always finishing gracefully
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < connectTimeout) {
+ if (this.isRunning()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("Exception in RemoteInterpreterProcess while synchronized dereference " +
+ "Thread.sleep", e);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ return r;
+ }
+ }
+
+ public int referenceCount() {
+ synchronized (referenceCount) {
+ return referenceCount.get();
+ }
+ }
+
+ public int getNumActiveClient() {
+ if (clientPool == null) {
+ return 0;
+ } else {
+ return clientPool.getNumActive();
+ }
+ }
+
+ public int getNumIdleClient() {
+ if (clientPool == null) {
+ return 0;
+ } else {
+ return clientPool.getNumIdle();
+ }
+ }
+
+ public void setMaxPoolSize(int size) {
+ if (clientPool != null) {
+ //Size + 2 for progress poller , cancel operation
+ clientPool.setMaxTotal(size + 2);
+ }
+ }
+
+ public int getMaxPoolSize() {
+ if (clientPool != null) {
+ return clientPool.getMaxTotal();
+ } else {
+ return 0;
+ }
+ }
+
/**
* Called when angular object is updated in client side to propagate
* change to the remote process
@@ -140,33 +239,4 @@ public abstract class RemoteInterpreterProcess {
public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
return interpreterContextRunnerPool;
}
-
- public <T> T callRemoteFunction(RemoteFunction<T> func) {
- Client client = null;
- boolean broken = false;
- try {
- client = getClient();
- if (client != null) {
- return func.call(client);
- }
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- } finally {
- if (client != null) {
- releaseClient(client, broken);
- }
- }
- return null;
- }
-
- /**
- *
- * @param <T>
- */
- public interface RemoteFunction<T> {
- T call(Client client) throws Exception;
- }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
deleted file mode 100644
index bb176be..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class connects to existing process
- */
-public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
- private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
- private final String host;
- private final int port;
-
- public RemoteInterpreterRunningProcess(
- int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
- String host,
- int port
- ) {
- super(connectTimeout, listener, appListener);
- this.host = host;
- this.port = port;
- }
-
- @Override
- public String getHost() {
- return host;
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public void start(String userName, Boolean isUserImpersonate) {
- // assume process is externally managed. nothing to do
- }
-
- @Override
- public void stop() {
- // assume process is externally managed. nothing to do
- }
-
- @Override
- public boolean isRunning() {
- return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 3d8123e..3853468 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -106,7 +106,6 @@ public class RemoteInterpreterServer
@Override
public void shutdown() throws TException {
- logger.info("Shutting down...");
eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
if (interpreterGroup != null) {
interpreterGroup.close();
@@ -160,7 +159,7 @@ public class RemoteInterpreterServer
}
@Override
- public void createInterpreter(String interpreterGroupId, String sessionId, String
+ public void createInterpreter(String interpreterGroupId, String sessionKey, String
className, Map<String, String> properties, String userName) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
@@ -191,11 +190,20 @@ public class RemoteInterpreterServer
replClass.getConstructor(new Class[] {Properties.class});
Interpreter repl = constructor.newInstance(p);
repl.setClassloaderUrls(new URL[]{});
+
+ synchronized (interpreterGroup) {
+ List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+ if (interpreters == null) {
+ interpreters = new LinkedList<>();
+ interpreterGroup.put(sessionKey, interpreters);
+ }
+
+ interpreters.add(new LazyOpenInterpreter(repl));
+ }
+
logger.info("Instantiate interpreter {}", className);
repl.setInterpreterGroup(interpreterGroup);
repl.setUserName(userName);
-
- interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException
| InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
@@ -229,13 +237,13 @@ public class RemoteInterpreterServer
}
}
- protected Interpreter getInterpreter(String sessionId, String className) throws TException {
+ protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
if (interpreterGroup == null) {
throw new TException(
new InterpreterException("Interpreter instance " + className + " not created"));
}
synchronized (interpreterGroup) {
- List<Interpreter> interpreters = interpreterGroup.get(sessionId);
+ List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
throw new TException(
new InterpreterException("Interpreter " + className + " not initialized"));
@@ -251,20 +259,19 @@ public class RemoteInterpreterServer
}
@Override
- public void open(String sessionId, String className) throws TException {
- logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId));
- Interpreter intp = getInterpreter(sessionId, className);
+ public void open(String noteId, String className) throws TException {
+ Interpreter intp = getInterpreter(noteId, className);
intp.open();
}
@Override
- public void close(String sessionId, String className) throws TException {
+ public void close(String sessionKey, String className) throws TException {
// unload all applications
for (String appId : runningApplications.keySet()) {
RunningApplication appInfo = runningApplications.get(appId);
// see NoteInterpreterLoader.SHARED_SESSION
- if (appInfo.noteId.equals(sessionId) || sessionId.equals("shared_session")) {
+ if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) {
try {
logger.info("Unload App {} ", appInfo.pkg.getName());
appInfo.app.unload();
@@ -279,7 +286,7 @@ public class RemoteInterpreterServer
// close interpreters
List<Interpreter> interpreters;
synchronized (interpreterGroup) {
- interpreters = interpreterGroup.get(sessionId);
+ interpreters = interpreterGroup.get(sessionKey);
}
if (interpreters != null) {
Iterator<Interpreter> it = interpreters.iterator();
@@ -315,6 +322,7 @@ public class RemoteInterpreterServer
intp,
st,
context);
+
scheduler.submit(job);
while (!job.isTerminated()) {
@@ -558,34 +566,30 @@ public class RemoteInterpreterServer
}
@Override
- public int getProgress(String sessionId, String className,
+ public int getProgress(String noteId, String className,
RemoteInterpreterContext interpreterContext)
throws TException {
Integer manuallyProvidedProgress = progressMap.get(interpreterContext.getParagraphId());
if (manuallyProvidedProgress != null) {
return manuallyProvidedProgress;
} else {
- Interpreter intp = getInterpreter(sessionId, className);
- if (intp == null) {
- throw new TException("No interpreter {} existed for session {}".format(
- className, sessionId));
- }
+ Interpreter intp = getInterpreter(noteId, className);
return intp.getProgress(convert(interpreterContext, null));
}
}
@Override
- public String getFormType(String sessionId, String className) throws TException {
- Interpreter intp = getInterpreter(sessionId, className);
+ public String getFormType(String noteId, String className) throws TException {
+ Interpreter intp = getInterpreter(noteId, className);
return intp.getFormType().toString();
}
@Override
- public List<InterpreterCompletion> completion(String sessionId,
+ public List<InterpreterCompletion> completion(String noteId,
String className, String buf, int cursor, RemoteInterpreterContext remoteInterpreterContext)
throws TException {
- Interpreter intp = getInterpreter(sessionId, className);
+ Interpreter intp = getInterpreter(noteId, className);
List completion = intp.completion(buf, cursor, convert(remoteInterpreterContext, null));
return completion;
}
@@ -762,16 +766,16 @@ public class RemoteInterpreterServer
}
@Override
- public String getStatus(String sessionId, String jobId)
+ public String getStatus(String sessionKey, String jobId)
throws TException {
if (interpreterGroup == null) {
- return Status.UNKNOWN.name();
+ return "Unknown";
}
synchronized (interpreterGroup) {
- List<Interpreter> interpreters = interpreterGroup.get(sessionId);
+ List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
- return Status.UNKNOWN.name();
+ return "Unknown";
}
for (Interpreter intp : interpreters) {
@@ -788,7 +792,7 @@ public class RemoteInterpreterServer
}
}
}
- return Status.UNKNOWN.name();
+ return "Unknown";
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
new file mode 100644
index 0000000..b26995a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.resource;
+
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Utilities for ResourcePool
+ */
+public class ResourcePoolUtils {
+ static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class);
+
+ public static ResourceSet getAllResources() {
+ return getAllResourcesExcept(null);
+ }
+
+ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
+ ResourceSet resourceSet = new ResourceSet();
+ for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+ if (interpreterGroupExcludsion != null &&
+ intpGroup.getId().equals(interpreterGroupExcludsion)) {
+ continue;
+ }
+
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ } else if (remoteInterpreterProcess.isRunning()) {
+ RemoteInterpreterService.Client client = null;
+ boolean broken = false;
+ try {
+ client = remoteInterpreterProcess.getClient();
+ if (client == null) {
+ // remote interpreter may not started yet or terminated.
+ continue;
+ }
+ List<String> resourceList = client.resourcePoolGetAll();
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+ }
+ }
+ }
+ }
+ return resourceSet;
+ }
+
+ public static void removeResourcesBelongsToNote(String noteId) {
+ removeResourcesBelongsToParagraph(noteId, null);
+ }
+
+ public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
+ for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+ ResourceSet resourceSet = new ResourceSet();
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ if (noteId != null) {
+ resourceSet = resourceSet.filterByNoteId(noteId);
+ }
+ if (paragraphId != null) {
+ resourceSet = resourceSet.filterByParagraphId(paragraphId);
+ }
+
+ for (Resource r : resourceSet) {
+ localPool.remove(
+ r.getResourceId().getNoteId(),
+ r.getResourceId().getParagraphId(),
+ r.getResourceId().getName());
+ }
+ } else if (remoteInterpreterProcess.isRunning()) {
+ RemoteInterpreterService.Client client = null;
+ boolean broken = false;
+ try {
+ client = remoteInterpreterProcess.getClient();
+ List<String> resourceList = client.resourcePoolGetAll();
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
+ }
+
+ if (noteId != null) {
+ resourceSet = resourceSet.filterByNoteId(noteId);
+ }
+ if (paragraphId != null) {
+ resourceSet = resourceSet.filterByParagraphId(paragraphId);
+ }
+
+ for (Resource r : resourceSet) {
+ client.resourceRemove(
+ r.getResourceId().getNoteId(),
+ r.getResourceId().getParagraphId(),
+ r.getResourceId().getName());
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ broken = true;
+ } finally {
+ if (client != null) {
+ intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+ }
+ }
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 191902a..d0025d8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -41,7 +41,6 @@ public abstract class Job {
/**
* Job status.
*
- * UNKNOWN - Job is not found in remote
* READY - Job is not running, ready to run.
* PENDING - Job is submitted to scheduler. but not running yet
* RUNNING - Job is running.
@@ -49,8 +48,8 @@ public abstract class Job {
* ERROR - Job finished run. with error
* ABORT - Job finished by abort
*/
- public enum Status {
- UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
+ public static enum Status {
+ READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
public boolean isReady() {
return this == READY;
@@ -71,14 +70,14 @@ public abstract class Job {
Date dateCreated;
Date dateStarted;
Date dateFinished;
- volatile Status status;
+ Status status;
static Logger LOGGER = LoggerFactory.getLogger(Job.class);
transient boolean aborted = false;
- private volatile String errorMessage;
- private transient volatile Throwable exception;
+ private String errorMessage;
+ private transient Throwable exception;
private transient JobListener listener;
private long progressUpdateIntervalMs;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index e41540b..f9ddc4e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -17,9 +17,11 @@
package org.apache.zeppelin.scheduler;
+import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +34,6 @@ import java.util.concurrent.ExecutorService;
/**
* RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
- *
*/
public class RemoteScheduler implements Scheduler {
Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
@@ -44,17 +45,17 @@ public class RemoteScheduler implements Scheduler {
boolean terminate = false;
private String name;
private int maxConcurrency;
- private final String sessionId;
- private RemoteInterpreter remoteInterpreter;
+ private final String noteId;
+ private RemoteInterpreterProcess interpreterProcess;
- public RemoteScheduler(String name, ExecutorService executor, String sessionId,
- RemoteInterpreter remoteInterpreter, SchedulerListener listener,
+ public RemoteScheduler(String name, ExecutorService executor, String noteId,
+ RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
int maxConcurrency) {
this.name = name;
this.executor = executor;
this.listener = listener;
- this.sessionId = sessionId;
- this.remoteInterpreter = remoteInterpreter;
+ this.noteId = noteId;
+ this.interpreterProcess = interpreterProcess;
this.maxConcurrency = maxConcurrency;
}
@@ -166,15 +167,14 @@ public class RemoteScheduler implements Scheduler {
private long initialPeriodMsec;
private long initialPeriodCheckIntervalMsec;
private long checkIntervalMsec;
- private volatile boolean terminate;
+ private boolean terminate;
private JobListener listener;
private Job job;
- volatile Status lastStatus;
+ Status lastStatus;
public JobStatusPoller(long initialPeriodMsec,
long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
JobListener listener) {
- setName("JobStatusPoller-" + job.getId());
this.initialPeriodMsec = initialPeriodMsec;
this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
this.checkIntervalMsec = checkIntervalMsec;
@@ -209,7 +209,7 @@ public class RemoteScheduler implements Scheduler {
}
Status newStatus = getStatus();
- if (newStatus == Status.UNKNOWN) { // unknown
+ if (newStatus == null) { // unknown
continue;
}
@@ -231,9 +231,7 @@ public class RemoteScheduler implements Scheduler {
private Status getLastStatus() {
if (terminate == true) {
- if (job.getErrorMessage() != null) {
- return Status.ERROR;
- } else if (lastStatus != Status.FINISHED &&
+ if (lastStatus != Status.FINISHED &&
lastStatus != Status.ERROR &&
lastStatus != Status.ABORT) {
return Status.FINISHED;
@@ -241,35 +239,58 @@ public class RemoteScheduler implements Scheduler {
return (lastStatus == null) ? Status.FINISHED : lastStatus;
}
} else {
- return (lastStatus == null) ? Status.UNKNOWN : lastStatus;
+ return (lastStatus == null) ? Status.FINISHED : lastStatus;
}
}
public synchronized Job.Status getStatus() {
- if (!remoteInterpreter.isOpened()) {
+ if (interpreterProcess.referenceCount() <= 0) {
return getLastStatus();
}
- Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId()));
- if (status == Status.UNKNOWN) {
- // not found this job in the remote schedulers.
- // maybe not submitted, maybe already finished
- //Status status = getLastStatus();
- listener.afterStatusChange(job, null, null);
- return job.getStatus();
+
+ Client client;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e) {
+ logger.error("Can't get status information", e);
+ lastStatus = Status.ERROR;
+ return Status.ERROR;
+ }
+
+ boolean broken = false;
+ try {
+ String statusStr = client.getStatus(noteId, job.getId());
+ if ("Unknown".equals(statusStr)) {
+ // not found this job in the remote schedulers.
+ // maybe not submitted, maybe already finished
+ //Status status = getLastStatus();
+ listener.afterStatusChange(job, null, null);
+ return job.getStatus();
+ }
+ Status status = Status.valueOf(statusStr);
+ lastStatus = status;
+ listener.afterStatusChange(job, null, status);
+ return status;
+ } catch (TException e) {
+ broken = true;
+ logger.error("Can't get status information", e);
+ lastStatus = Status.ERROR;
+ return Status.ERROR;
+ } catch (Exception e) {
+ logger.error("Unknown status", e);
+ lastStatus = Status.ERROR;
+ return Status.ERROR;
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
}
- lastStatus = status;
- listener.afterStatusChange(job, null, status);
- return status;
}
}
- //TODO(zjffdu) need to refactor the schdule module which is too complicated
private class JobRunner implements Runnable, JobListener {
- private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
private Scheduler scheduler;
private Job job;
- private volatile boolean jobExecuted;
- volatile boolean jobSubmittedRemotely;
+ private boolean jobExecuted;
+ boolean jobSubmittedRemotely;
public JobRunner(Scheduler scheduler, Job job) {
this.scheduler = scheduler;
@@ -317,22 +338,20 @@ public class RemoteScheduler implements Scheduler {
}
// set job status based on result.
+ Status lastStatus = jobStatusPoller.getStatus();
Object jobResult = job.getReturn();
- if (job.isAborted()) {
- job.setStatus(Status.ABORT);
- } else if (job.getException() != null) {
-// logger.info("Job ABORT, " + job.getId());
- job.setStatus(Status.ERROR);
- } else if (jobResult != null && jobResult instanceof InterpreterResult
- && ((InterpreterResult) jobResult).code() == Code.ERROR) {
-// logger.info("Job Error, " + job.getId());
- job.setStatus(Status.ERROR);
- } else {
-// logger.info("Job Finished, " + job.getId());
- job.setStatus(Status.FINISHED);
+ if (jobResult != null && jobResult instanceof InterpreterResult) {
+ if (((InterpreterResult) jobResult).code() == Code.ERROR) {
+ lastStatus = Status.ERROR;
+ }
+ }
+ if (job.getException() != null) {
+ lastStatus = Status.ERROR;
}
synchronized (queue) {
+ job.setStatus(lastStatus);
+
if (listener != null) {
listener.jobFinished(scheduler, job);
}
@@ -355,6 +374,25 @@ public class RemoteScheduler implements Scheduler {
@Override
public void afterStatusChange(Job job, Status before, Status after) {
+ if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
+ if (jobExecuted) {
+ jobSubmittedRemotely = true;
+ Object jobResult = job.getReturn();
+ if (job.isAborted()) {
+ job.setStatus(Status.ABORT);
+ } else if (job.getException() != null) {
+ job.setStatus(Status.ERROR);
+ } else if (jobResult != null && jobResult instanceof InterpreterResult
+ && ((InterpreterResult) jobResult).code() == Code.ERROR) {
+ job.setStatus(Status.ERROR);
+ } else {
+ job.setStatus(Status.FINISHED);
+ }
+ }
+ return;
+ }
+
+
// Update remoteStatus
if (jobExecuted == false) {
if (after == Status.FINISHED || after == Status.ABORT
@@ -364,18 +402,14 @@ public class RemoteScheduler implements Scheduler {
return;
} else if (after == Status.RUNNING) {
jobSubmittedRemotely = true;
- job.setStatus(Status.RUNNING);
-// logger.info("Job RUNNING, " + job.getId());
}
} else {
jobSubmittedRemotely = true;
}
- // only set status when it is RUNNING
- // We would set other status based on the interpret result
- if (after == Status.RUNNING) {
-// logger.info("Job RUNNING, " + job.getId());
- job.setStatus(Status.RUNNING);
+ // status polled by status poller
+ if (job.getStatus() != after) {
+ job.setStatus(after);
}
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 5871ca5..af52dec 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -24,18 +24,17 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Factory class for creating schedulers
- *
+ * TODO(moon) : add description.
*/
public class SchedulerFactory implements SchedulerListener {
private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
- private ExecutorService executor;
- private Map<String, Scheduler> schedulers = new LinkedHashMap<>();
+ ExecutorService executor;
+ Map<String, Scheduler> schedulers = new LinkedHashMap<>();
private static SchedulerFactory singleton;
private static Long singletonLock = new Long(0);
@@ -55,17 +54,17 @@ public class SchedulerFactory implements SchedulerListener {
return singleton;
}
- SchedulerFactory() throws Exception {
- executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100);
+ public SchedulerFactory() throws Exception {
+ executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
}
public void destroy() {
- ExecutorFactory.singleton().shutdown("SchedulerFactory");
+ ExecutorFactory.singleton().shutdown("schedulerFactory");
}
public Scheduler createOrGetFIFOScheduler(String name) {
synchronized (schedulers) {
- if (!schedulers.containsKey(name)) {
+ if (schedulers.containsKey(name) == false) {
Scheduler s = new FIFOScheduler(name, executor, this);
schedulers.put(name, s);
executor.execute(s);
@@ -76,7 +75,7 @@ public class SchedulerFactory implements SchedulerListener {
public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
synchronized (schedulers) {
- if (!schedulers.containsKey(name)) {
+ if (schedulers.containsKey(name) == false) {
Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
schedulers.put(name, s);
executor.execute(s);
@@ -87,17 +86,17 @@ public class SchedulerFactory implements SchedulerListener {
public Scheduler createOrGetRemoteScheduler(
String name,
- String sessionId,
- RemoteInterpreter remoteInterpreter,
+ String noteId,
+ RemoteInterpreterProcess interpreterProcess,
int maxConcurrency) {
synchronized (schedulers) {
- if (!schedulers.containsKey(name)) {
+ if (schedulers.containsKey(name) == false) {
Scheduler s = new RemoteScheduler(
name,
executor,
- sessionId,
- remoteInterpreter,
+ noteId,
+ interpreterProcess,
this,
maxConcurrency);
schedulers.put(name, s);
@@ -107,24 +106,38 @@ public class SchedulerFactory implements SchedulerListener {
}
}
- public void removeScheduler(String name) {
+ public Scheduler removeScheduler(String name) {
synchronized (schedulers) {
Scheduler s = schedulers.remove(name);
if (s != null) {
s.stop();
}
}
+ return null;
+ }
+
+ public Collection<Scheduler> listScheduler(String name) {
+ List<Scheduler> s = new LinkedList<>();
+ synchronized (schedulers) {
+ for (Scheduler ss : schedulers.values()) {
+ s.add(ss);
+ }
+ }
+ return s;
}
@Override
public void jobStarted(Scheduler scheduler, Job job) {
- logger.info("Job " + job.getId() + " started by scheduler " + scheduler.getName());
+ logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
}
@Override
public void jobFinished(Scheduler scheduler, Job job) {
- logger.info("Job " + job.getId() + " finished by scheduler " + scheduler.getName());
+ logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
index 1926528..8673476 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.tabledata;
import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
deleted file mode 100644
index 14c03a1..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.util;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Generate Tiny ID.
- */
-public class IdHashes {
- private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', '5', '6', '7', '8', '9',
- 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
- 'W', 'X', 'Y', 'Z'};
-
- /**
- * encodes the given string into the base of the dictionary provided in the constructor.
- *
- * @param value the number to encode.
- * @return the encoded string.
- */
- private static String encode(Long value) {
-
- List<Character> result = new ArrayList<>();
- BigInteger base = new BigInteger("" + DICTIONARY.length);
- int exponent = 1;
- BigInteger remaining = new BigInteger(value.toString());
- while (true) {
- BigInteger a = base.pow(exponent); // 16^1 = 16
- BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112
- BigInteger c = base.pow(exponent - 1);
- BigInteger d = b.divide(c);
-
- // if d > dictionary.length, we have a problem. but BigInteger doesnt have
- // a greater than method :-( hope for the best. theoretically, d is always
- // an index of the dictionary!
- result.add(DICTIONARY[d.intValue()]);
- remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0
-
- // finished?
- if (remaining.equals(BigInteger.ZERO)) {
- break;
- }
-
- exponent++;
- }
-
- // need to reverse it, since the start of the list contains the least significant values
- StringBuffer sb = new StringBuffer();
- for (int i = result.size() - 1; i >= 0; i--) {
- sb.append(result.get(i));
- }
- return sb.toString();
- }
-
- public static String generateId() {
- return encode(System.currentTimeMillis() + new Random().nextInt());
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
deleted file mode 100644
index 6153f49..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.util;
-
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * TODO(moon) : add description.
- */
-public class Util {
- private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
- private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev";
- private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
-
- private static Properties projectProperties;
- private static Properties gitProperties;
-
- static {
- projectProperties = new Properties();
- gitProperties = new Properties();
- try {
- projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
- gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
- } catch (IOException e) {
- //Fail to read project.properties
- }
- }
-
- /**
- * Get Zeppelin version
- *
- * @return Current Zeppelin version
- */
- public static String getVersion() {
- return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
- StringUtils.EMPTY);
- }
-
- /**
- * Get Zeppelin Git latest commit id
- *
- * @return Latest Zeppelin commit id
- */
- public static String getGitCommitId() {
- return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
- StringUtils.EMPTY);
- }
-
- /**
- * Get Zeppelin Git latest commit timestamp
- *
- * @return Latest Zeppelin commit timestamp
- */
- public static String getGitTimestamp() {
- return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
- StringUtils.EMPTY);
- }
-}