You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:27 UTC
[8/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component
Refactoring
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index d0025d8..191902a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -41,6 +41,7 @@ public abstract class Job {
/**
* Job status.
*
+ * UNKNOWN - Job is not found in remote
* READY - Job is not running, ready to run.
* PENDING - Job is submitted to scheduler. but not running yet
* RUNNING - Job is running.
@@ -48,8 +49,8 @@ public abstract class Job {
* ERROR - Job finished run. with error
* ABORT - Job finished by abort
*/
- public static enum Status {
- READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
+ public enum Status {
+ UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
public boolean isReady() {
return this == READY;
@@ -70,14 +71,14 @@ public abstract class Job {
Date dateCreated;
Date dateStarted;
Date dateFinished;
- Status status;
+ volatile Status status;
static Logger LOGGER = LoggerFactory.getLogger(Job.class);
transient boolean aborted = false;
- private String errorMessage;
- private transient Throwable exception;
+ private volatile String errorMessage;
+ private transient volatile Throwable exception;
private transient JobListener listener;
private long progressUpdateIntervalMs;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
deleted file mode 100644
index f9ddc4e..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scheduler;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-/**
- * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
- */
-public class RemoteScheduler implements Scheduler {
- Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
-
- List<Job> queue = new LinkedList<>();
- List<Job> running = new LinkedList<>();
- private ExecutorService executor;
- private SchedulerListener listener;
- boolean terminate = false;
- private String name;
- private int maxConcurrency;
- private final String noteId;
- private RemoteInterpreterProcess interpreterProcess;
-
- public RemoteScheduler(String name, ExecutorService executor, String noteId,
- RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
- int maxConcurrency) {
- this.name = name;
- this.executor = executor;
- this.listener = listener;
- this.noteId = noteId;
- this.interpreterProcess = interpreterProcess;
- this.maxConcurrency = maxConcurrency;
- }
-
- @Override
- public void run() {
- while (terminate == false) {
- Job job = null;
-
- synchronized (queue) {
- if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
- try {
- queue.wait(500);
- } catch (InterruptedException e) {
- logger.error("Exception in RemoteScheduler while run queue.wait", e);
- }
- continue;
- }
-
- job = queue.remove(0);
- running.add(job);
- }
-
- // run
- Scheduler scheduler = this;
- JobRunner jobRunner = new JobRunner(scheduler, job);
- executor.execute(jobRunner);
-
- // wait until it is submitted to the remote
- while (!jobRunner.isJobSubmittedInRemote()) {
- synchronized (queue) {
- try {
- queue.wait(500);
- } catch (InterruptedException e) {
- logger.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
- "queue.wait", e);
- }
- }
- }
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public Collection<Job> getJobsWaiting() {
- List<Job> ret = new LinkedList<>();
- synchronized (queue) {
- for (Job job : queue) {
- ret.add(job);
- }
- }
- return ret;
- }
-
- @Override
- public Job removeFromWaitingQueue(String jobId) {
- synchronized (queue) {
- Iterator<Job> it = queue.iterator();
- while (it.hasNext()) {
- Job job = it.next();
- if (job.getId().equals(jobId)) {
- it.remove();
- return job;
- }
- }
- }
- return null;
- }
-
- @Override
- public Collection<Job> getJobsRunning() {
- List<Job> ret = new LinkedList<>();
- synchronized (queue) {
- for (Job job : running) {
- ret.add(job);
- }
- }
- return ret;
- }
-
- @Override
- public void submit(Job job) {
- if (terminate) {
- throw new RuntimeException("Scheduler already terminated");
- }
- job.setStatus(Status.PENDING);
-
- synchronized (queue) {
- queue.add(job);
- queue.notify();
- }
- }
-
- public void setMaxConcurrency(int maxConcurrency) {
- this.maxConcurrency = maxConcurrency;
- synchronized (queue) {
- queue.notify();
- }
- }
-
- /**
- * Role of the class is get status info from remote process from PENDING to
- * RUNNING status.
- */
- private class JobStatusPoller extends Thread {
- private long initialPeriodMsec;
- private long initialPeriodCheckIntervalMsec;
- private long checkIntervalMsec;
- private boolean terminate;
- private JobListener listener;
- private Job job;
- Status lastStatus;
-
- public JobStatusPoller(long initialPeriodMsec,
- long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
- JobListener listener) {
- this.initialPeriodMsec = initialPeriodMsec;
- this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
- this.checkIntervalMsec = checkIntervalMsec;
- this.job = job;
- this.listener = listener;
- this.terminate = false;
- }
-
- @Override
- public void run() {
- long started = System.currentTimeMillis();
- while (terminate == false) {
- long current = System.currentTimeMillis();
- long interval;
- if (current - started < initialPeriodMsec) {
- interval = initialPeriodCheckIntervalMsec;
- } else {
- interval = checkIntervalMsec;
- }
-
- synchronized (this) {
- try {
- this.wait(interval);
- } catch (InterruptedException e) {
- logger.error("Exception in RemoteScheduler while run this.wait", e);
- }
- }
-
- if (terminate) {
- // terminated by shutdown
- break;
- }
-
- Status newStatus = getStatus();
- if (newStatus == null) { // unknown
- continue;
- }
-
- if (newStatus != Status.READY && newStatus != Status.PENDING) {
- // we don't need more
- break;
- }
- }
- terminate = true;
- }
-
- public void shutdown() {
- terminate = true;
- synchronized (this) {
- this.notify();
- }
- }
-
-
- private Status getLastStatus() {
- if (terminate == true) {
- if (lastStatus != Status.FINISHED &&
- lastStatus != Status.ERROR &&
- lastStatus != Status.ABORT) {
- return Status.FINISHED;
- } else {
- return (lastStatus == null) ? Status.FINISHED : lastStatus;
- }
- } else {
- return (lastStatus == null) ? Status.FINISHED : lastStatus;
- }
- }
-
- public synchronized Job.Status getStatus() {
- if (interpreterProcess.referenceCount() <= 0) {
- return getLastStatus();
- }
-
- Client client;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e) {
- logger.error("Can't get status information", e);
- lastStatus = Status.ERROR;
- return Status.ERROR;
- }
-
- boolean broken = false;
- try {
- String statusStr = client.getStatus(noteId, job.getId());
- if ("Unknown".equals(statusStr)) {
- // not found this job in the remote schedulers.
- // maybe not submitted, maybe already finished
- //Status status = getLastStatus();
- listener.afterStatusChange(job, null, null);
- return job.getStatus();
- }
- Status status = Status.valueOf(statusStr);
- lastStatus = status;
- listener.afterStatusChange(job, null, status);
- return status;
- } catch (TException e) {
- broken = true;
- logger.error("Can't get status information", e);
- lastStatus = Status.ERROR;
- return Status.ERROR;
- } catch (Exception e) {
- logger.error("Unknown status", e);
- lastStatus = Status.ERROR;
- return Status.ERROR;
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
- }
-
- private class JobRunner implements Runnable, JobListener {
- private Scheduler scheduler;
- private Job job;
- private boolean jobExecuted;
- boolean jobSubmittedRemotely;
-
- public JobRunner(Scheduler scheduler, Job job) {
- this.scheduler = scheduler;
- this.job = job;
- jobExecuted = false;
- jobSubmittedRemotely = false;
- }
-
- public boolean isJobSubmittedInRemote() {
- return jobSubmittedRemotely;
- }
-
- @Override
- public void run() {
- if (job.isAborted()) {
- synchronized (queue) {
- job.setStatus(Status.ABORT);
- job.aborted = false;
-
- running.remove(job);
- queue.notify();
- }
- jobSubmittedRemotely = true;
-
- return;
- }
-
- JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
- job, this);
- jobStatusPoller.start();
-
- if (listener != null) {
- listener.jobStarted(scheduler, job);
- }
- job.run();
-
- jobExecuted = true;
- jobSubmittedRemotely = true;
-
- jobStatusPoller.shutdown();
- try {
- jobStatusPoller.join();
- } catch (InterruptedException e) {
- logger.error("JobStatusPoller interrupted", e);
- }
-
- // set job status based on result.
- Status lastStatus = jobStatusPoller.getStatus();
- Object jobResult = job.getReturn();
- if (jobResult != null && jobResult instanceof InterpreterResult) {
- if (((InterpreterResult) jobResult).code() == Code.ERROR) {
- lastStatus = Status.ERROR;
- }
- }
- if (job.getException() != null) {
- lastStatus = Status.ERROR;
- }
-
- synchronized (queue) {
- job.setStatus(lastStatus);
-
- if (listener != null) {
- listener.jobFinished(scheduler, job);
- }
-
- // reset aborted flag to allow retry
- job.aborted = false;
-
- running.remove(job);
- queue.notify();
- }
- }
-
- @Override
- public void onProgressUpdate(Job job, int progress) {
- }
-
- @Override
- public void beforeStatusChange(Job job, Status before, Status after) {
- }
-
- @Override
- public void afterStatusChange(Job job, Status before, Status after) {
- if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
- if (jobExecuted) {
- jobSubmittedRemotely = true;
- Object jobResult = job.getReturn();
- if (job.isAborted()) {
- job.setStatus(Status.ABORT);
- } else if (job.getException() != null) {
- job.setStatus(Status.ERROR);
- } else if (jobResult != null && jobResult instanceof InterpreterResult
- && ((InterpreterResult) jobResult).code() == Code.ERROR) {
- job.setStatus(Status.ERROR);
- } else {
- job.setStatus(Status.FINISHED);
- }
- }
- return;
- }
-
-
- // Update remoteStatus
- if (jobExecuted == false) {
- if (after == Status.FINISHED || after == Status.ABORT
- || after == Status.ERROR) {
- // it can be status of last run.
- // so not updating the remoteStatus
- return;
- } else if (after == Status.RUNNING) {
- jobSubmittedRemotely = true;
- }
- } else {
- jobSubmittedRemotely = true;
- }
-
- // status polled by status poller
- if (job.getStatus() != after) {
- job.setStatus(after);
- }
- }
- }
-
- @Override
- public void stop() {
- terminate = true;
- synchronized (queue) {
- queue.notify();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index af52dec..b629ef7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -17,24 +17,21 @@
package org.apache.zeppelin.scheduler;
-import java.util.Collection;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * TODO(moon) : add description.
+ * Factory class for creating schedulers
+ *
*/
public class SchedulerFactory implements SchedulerListener {
private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
- ExecutorService executor;
- Map<String, Scheduler> schedulers = new LinkedHashMap<>();
+ protected ExecutorService executor;
+ protected Map<String, Scheduler> schedulers = new LinkedHashMap<>();
private static SchedulerFactory singleton;
private static Long singletonLock = new Long(0);
@@ -54,17 +51,17 @@ public class SchedulerFactory implements SchedulerListener {
return singleton;
}
- public SchedulerFactory() throws Exception {
- executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
+ SchedulerFactory() throws Exception {
+ executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100);
}
public void destroy() {
- ExecutorFactory.singleton().shutdown("schedulerFactory");
+ ExecutorFactory.singleton().shutdown("SchedulerFactory");
}
public Scheduler createOrGetFIFOScheduler(String name) {
synchronized (schedulers) {
- if (schedulers.containsKey(name) == false) {
+ if (!schedulers.containsKey(name)) {
Scheduler s = new FIFOScheduler(name, executor, this);
schedulers.put(name, s);
executor.execute(s);
@@ -75,7 +72,7 @@ public class SchedulerFactory implements SchedulerListener {
public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
synchronized (schedulers) {
- if (schedulers.containsKey(name) == false) {
+ if (!schedulers.containsKey(name)) {
Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
schedulers.put(name, s);
executor.execute(s);
@@ -84,60 +81,38 @@ public class SchedulerFactory implements SchedulerListener {
}
}
- public Scheduler createOrGetRemoteScheduler(
- String name,
- String noteId,
- RemoteInterpreterProcess interpreterProcess,
- int maxConcurrency) {
-
+ public Scheduler createOrGetScheduler(Scheduler scheduler) {
synchronized (schedulers) {
- if (schedulers.containsKey(name) == false) {
- Scheduler s = new RemoteScheduler(
- name,
- executor,
- noteId,
- interpreterProcess,
- this,
- maxConcurrency);
- schedulers.put(name, s);
- executor.execute(s);
+ if (!schedulers.containsKey(scheduler.getName())) {
+ schedulers.put(scheduler.getName(), scheduler);
+ executor.execute(scheduler);
}
- return schedulers.get(name);
+ return schedulers.get(scheduler.getName());
}
}
- public Scheduler removeScheduler(String name) {
+ public void removeScheduler(String name) {
synchronized (schedulers) {
Scheduler s = schedulers.remove(name);
if (s != null) {
s.stop();
}
}
- return null;
}
- public Collection<Scheduler> listScheduler(String name) {
- List<Scheduler> s = new LinkedList<>();
- synchronized (schedulers) {
- for (Scheduler ss : schedulers.values()) {
- s.add(ss);
- }
- }
- return s;
+ public ExecutorService getExecutor() {
+ return executor;
}
@Override
public void jobStarted(Scheduler scheduler, Job job) {
- logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
+ logger.info("Job " + job.getId() + " started by scheduler " + scheduler.getName());
}
@Override
public void jobFinished(Scheduler scheduler, Job job) {
- logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
+ logger.info("Job " + job.getId() + " finished by scheduler " + scheduler.getName());
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
index 8673476..1926528 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.tabledata;
import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
new file mode 100644
index 0000000..14c03a1
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Generate Tiny ID.
+ */
+public class IdHashes {
+ private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', '5', '6', '7', '8', '9',
+ 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
+ 'W', 'X', 'Y', 'Z'};
+
+ /**
+ * encodes the given string into the base of the dictionary provided in the constructor.
+ *
+ * @param value the number to encode.
+ * @return the encoded string.
+ */
+ private static String encode(Long value) {
+
+ List<Character> result = new ArrayList<>();
+ BigInteger base = new BigInteger("" + DICTIONARY.length);
+ int exponent = 1;
+ BigInteger remaining = new BigInteger(value.toString());
+ while (true) {
+ BigInteger a = base.pow(exponent); // 16^1 = 16
+ BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112
+ BigInteger c = base.pow(exponent - 1);
+ BigInteger d = b.divide(c);
+
+ // if d > dictionary.length, we have a problem. but BigInteger doesnt have
+ // a greater than method :-( hope for the best. theoretically, d is always
+ // an index of the dictionary!
+ result.add(DICTIONARY[d.intValue()]);
+ remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0
+
+ // finished?
+ if (remaining.equals(BigInteger.ZERO)) {
+ break;
+ }
+
+ exponent++;
+ }
+
+ // need to reverse it, since the start of the list contains the least significant values
+ StringBuffer sb = new StringBuffer();
+ for (int i = result.size() - 1; i >= 0; i--) {
+ sb.append(result.get(i));
+ }
+ return sb.toString();
+ }
+
+ public static String generateId() {
+ return encode(System.currentTimeMillis() + new Random().nextInt());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
new file mode 100644
index 0000000..6153f49
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * TODO(moon) : add description.
+ */
+public class Util {
+ private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
+ private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev";
+ private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
+
+ private static Properties projectProperties;
+ private static Properties gitProperties;
+
+ static {
+ projectProperties = new Properties();
+ gitProperties = new Properties();
+ try {
+ projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
+ gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
+ } catch (IOException e) {
+ //Fail to read project.properties
+ }
+ }
+
+ /**
+ * Get Zeppelin version
+ *
+ * @return Current Zeppelin version
+ */
+ public static String getVersion() {
+ return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
+ StringUtils.EMPTY);
+ }
+
+ /**
+ * Get Zeppelin Git latest commit id
+ *
+ * @return Latest Zeppelin commit id
+ */
+ public static String getGitCommitId() {
+ return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
+ StringUtils.EMPTY);
+ }
+
+ /**
+ * Get Zeppelin Git latest commit timestamp
+ *
+ * @return Latest Zeppelin commit timestamp
+ */
+ public static String getGitTimestamp() {
+ return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
+ StringUtils.EMPTY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
deleted file mode 100644
index a7a6eb9..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.zeppelin.interpreter;
-
-import java.util.Properties;
-
-/**
- *
- */
-public class DummyInterpreter extends Interpreter {
-
- public DummyInterpreter(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- return null;
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return null;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
index e376809..f3a30fb 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
@@ -29,7 +30,7 @@ import org.junit.Test;
public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener {
private File tmpDir;
private File fileChanged;
- private int numChanged;
+ private AtomicInteger numChanged;
private InterpreterOutputChangeWatcher watcher;
@Before
@@ -40,7 +41,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
fileChanged = null;
- numChanged = 0;
+ numChanged = new AtomicInteger(0);
}
@After
@@ -66,7 +67,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan
@Test
public void test() throws IOException, InterruptedException {
assertNull(fileChanged);
- assertEquals(0, numChanged);
+ assertEquals(0, numChanged.get());
Thread.sleep(1000);
// create new file
@@ -92,14 +93,14 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan
}
assertNotNull(fileChanged);
- assertEquals(1, numChanged);
+ assertEquals(1, numChanged.get());
}
@Override
public void fileChanged(File file) {
fileChanged = file;
- numChanged++;
+ numChanged.incrementAndGet();
synchronized(this) {
notify();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
index 305268c..31c9225 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
+//TODO(zjffdu) add more test for Interpreter which is a very important class
public class InterpreterTest {
@Test
@@ -85,4 +86,41 @@ public class InterpreterTest {
);
}
+ public static class DummyInterpreter extends Interpreter {
+
+ public DummyInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ return null;
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return null;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
new file mode 100644
index 0000000..5f7426a
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+public class RemoteInterpreterUtilsTest {
+
+ @Test
+ public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
+ assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/conf/interpreter.json
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
new file mode 100644
index 0000000..45e1d60
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
@@ -0,0 +1,115 @@
+{
+ "interpreterSettings": {
+ "2C3RWCVAG": {
+ "id": "2C3RWCVAG",
+ "name": "test",
+ "group": "test",
+ "properties": {
+ "property_1": "value_1",
+ "property_2": "new_value_2",
+ "property_3": "value_3"
+ },
+ "status": "READY",
+ "interpreterGroup": [
+ {
+ "name": "echo",
+ "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+ "defaultInterpreter": true,
+ "editor": {
+ "language": "java",
+ "editOnDblClick": false
+ }
+ }
+ ],
+ "dependencies": [],
+ "option": {
+ "remote": true,
+ "port": -1,
+ "perNote": "shared",
+ "perUser": "shared",
+ "isExistingProcess": false,
+ "setPermission": false,
+ "users": [],
+ "isUserImpersonate": false
+ }
+ },
+
+ "2CKWE7B19": {
+ "id": "2CKWE7B19",
+ "name": "test2",
+ "group": "test",
+ "properties": {
+ "property_1": "value_1",
+ "property_2": "new_value_2",
+ "property_3": "value_3"
+ },
+ "status": "READY",
+ "interpreterGroup": [
+ {
+ "name": "echo",
+ "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+ "defaultInterpreter": true,
+ "editor": {
+ "language": "java",
+ "editOnDblClick": false
+ }
+ }
+ ],
+ "dependencies": [],
+ "option": {
+ "remote": true,
+ "port": -1,
+ "perNote": "shared",
+ "perUser": "shared",
+ "isExistingProcess": false,
+ "setPermission": false,
+ "users": [],
+ "isUserImpersonate": false
+ }
+ }
+ },
+ "interpreterBindings": {
+ "2C6793KRV": [
+ "2C48Y7FSJ",
+ "2C63XW4XE",
+ "2C66GE1VB",
+ "2C5VH924X",
+ "2C4BJDRRZ",
+ "2C3SQSB7V",
+ "2C4HKDCQW",
+ "2C3DR183X",
+ "2C66Z9XPQ",
+ "2C3PTPMUH",
+ "2C69WE69N",
+ "2C5SRRXHM",
+ "2C4ZD49PF",
+ "2C6V3D44K",
+ "2C4UB1UZA",
+ "2C5S1R21W",
+ "2C5DCRVGM",
+ "2C686X8ZH",
+ "2C3RWCVAG",
+ "2C3JKFMJU",
+ "2C3VECEG2"
+ ]
+ },
+ "interpreterRepositories": [
+ {
+ "id": "central",
+ "type": "default",
+ "url": "http://repo1.maven.org/maven2/",
+ "releasePolicy": {
+ "enabled": true,
+ "updatePolicy": "daily",
+ "checksumPolicy": "warn"
+ },
+ "snapshotPolicy": {
+ "enabled": true,
+ "updatePolicy": "daily",
+ "checksumPolicy": "warn"
+ },
+ "mirroredRepositories": [],
+ "repositoryManager": false
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
new file mode 100644
index 0000000..1ba1b94
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
@@ -0,0 +1,42 @@
+[
+ {
+ "group": "test",
+ "name": "double_echo",
+ "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter",
+ "properties": {
+ "property_1": {
+ "envName": "PROPERTY_1",
+ "propertyName": "property_1",
+ "defaultValue": "value_1",
+ "description": "desc_1"
+ },
+ "property_2": {
+ "envName": "PROPERTY_2",
+ "propertyName": "property_2",
+ "defaultValue": "value_2",
+ "description": "desc_2"
+ }
+ }
+ },
+
+ {
+ "group": "test",
+ "name": "echo",
+ "defaultInterpreter": true,
+ "className": "org.apache.zeppelin.interpreter.EchoInterpreter",
+ "properties": {
+ "property_1": {
+ "envName": "PROPERTY_1",
+ "propertyName": "property_1",
+ "defaultValue": "value_1",
+ "description": "desc_1"
+ },
+ "property_2": {
+ "envName": "PROPERTY_2",
+ "propertyName": "property_2",
+ "defaultValue": "value_2",
+ "description": "desc_2"
+ }
+ }
+ }
+]
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties
index d8a7839..6f34691 100644
--- a/zeppelin-interpreter/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter/src/test/resources/log4j.properties
@@ -26,4 +26,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
#
# Root logger option
-log4j.rootLogger=INFO, stdout
\ No newline at end of file
+log4j.rootLogger=INFO, stdout
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.scheduler=DEBUG
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index cd0210e..c1dba5c 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -185,7 +185,7 @@ public class InterpreterRestApi {
String noteId = request == null ? null : request.getNoteId();
if (null == noteId) {
- interpreterSettingManager.close(setting);
+ interpreterSettingManager.close(settingId);
} else {
interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal());
}
@@ -208,7 +208,7 @@ public class InterpreterRestApi {
@GET
@ZeppelinApi
public Response listInterpreter(String message) {
- Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings();
+ Map<String, InterpreterSetting> m = interpreterSettingManager.getInterpreterSettingTemplates();
return new JsonResponse<>(Status.OK, "", m).build();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 7453470..c103eeb 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -31,12 +31,10 @@ import org.apache.shiro.web.env.EnvironmentLoaderListener;
import org.apache.shiro.web.servlet.ShiroFilter;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumBundleFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
-import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.notebook.Notebook;
@@ -93,13 +91,11 @@ public class ZeppelinServer extends Application {
private NotebookRepoSync notebookRepo;
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
- private DependencyResolver depResolver;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
- this.depResolver = new DependencyResolver(
- conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
+
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
@@ -129,13 +125,26 @@ public class ZeppelinServer extends Application {
new File(conf.getRelativeDir("zeppelin-web/src/app/spell")));
}
+ this.schedulerFactory = SchedulerFactory.singleton();
+ this.interpreterSettingManager = new InterpreterSettingManager(conf, notebookWsServer,
+ notebookWsServer, notebookWsServer);
+ this.replFactory = new InterpreterFactory(interpreterSettingManager);
+ this.notebookRepo = new NotebookRepoSync(conf);
+ this.noteSearchService = new LuceneSearch();
+ this.notebookAuthorization = NotebookAuthorization.init(conf);
+ this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
+ notebook = new Notebook(conf,
+ notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
+ noteSearchService, notebookAuthorization, credentials);
+
ZeppelinServer.helium = new Helium(
conf.getHeliumConfPath(),
conf.getHeliumRegistry(),
new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO),
"helium-registry-cache"),
heliumBundleFactory,
- heliumApplicationFactory);
+ heliumApplicationFactory,
+ interpreterSettingManager);
// create bundle
try {
@@ -144,20 +153,6 @@ public class ZeppelinServer extends Application {
LOG.error(e.getMessage(), e);
}
- this.schedulerFactory = new SchedulerFactory();
- this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver,
- new InterpreterOption(true));
- this.replFactory = new InterpreterFactory(conf, notebookWsServer,
- notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(),
- interpreterSettingManager);
- this.notebookRepo = new NotebookRepoSync(conf);
- this.noteSearchService = new LuceneSearch();
- this.notebookAuthorization = NotebookAuthorization.init(conf);
- this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
- notebook = new Notebook(conf,
- notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
- noteSearchService, notebookAuthorization, credentials);
-
// to update notebook from application event from remote process.
heliumApplicationFactory.setNotebook(notebook);
// to update fire websocket event on application event.
@@ -206,7 +201,7 @@ public class ZeppelinServer extends Application {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
- notebook.getInterpreterSettingManager().shutdown();
+ notebook.getInterpreterSettingManager().close();
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index f0e0bb2..2e3a5c7 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -41,12 +41,7 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -59,7 +54,6 @@ import org.apache.zeppelin.notebook.NotebookEventListener;
import org.apache.zeppelin.notebook.NotebookImportDeserializer;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.ParagraphJobListener;
-import org.apache.zeppelin.notebook.ParagraphRuntimeInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
@@ -86,8 +80,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Queues;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
/**
@@ -463,7 +455,8 @@ public class NotebookServer extends WebSocketServlet
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
- List<String> ids = notebook.getInterpreterSettingManager().getInterpreters(note.getId());
+ List<String> ids = notebook.getInterpreterSettingManager()
+ .getInterpreterBinding(note.getId());
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(note.getId(), m);
@@ -1022,7 +1015,7 @@ public class NotebookServer extends WebSocketServlet
List<String> interpreterSettingIds = new LinkedList<>();
interpreterSettingIds.add(defaultInterpreterId);
for (String interpreterSettingId : notebook.getInterpreterSettingManager().
- getDefaultInterpreterSettingList()) {
+ getInterpreterSettingIds()) {
if (!interpreterSettingId.equals(defaultInterpreterId)) {
interpreterSettingIds.add(interpreterSettingId);
}
@@ -1382,12 +1375,13 @@ public class NotebookServer extends WebSocketServlet
List<InterpreterSetting> settings =
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
- if (setting.getInterpreterGroup(user, note.getId()) == null) {
+ if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
continue;
}
- if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId()).getId())) {
+ if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId())
+ .getId())) {
AngularObjectRegistry angularObjectRegistry =
- setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
+ setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
// first trying to get local registry
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
@@ -1424,12 +1418,13 @@ public class NotebookServer extends WebSocketServlet
List<InterpreterSetting> settings =
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
- if (setting.getInterpreterGroup(user, n.getId()) == null) {
+ if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
continue;
}
- if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId()).getId())) {
+ if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
+ .getId())) {
AngularObjectRegistry angularObjectRegistry =
- setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
+ setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
this.broadcastExcept(n.getId(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
@@ -2302,13 +2297,13 @@ public class NotebookServer extends WebSocketServlet
for (InterpreterSetting intpSetting : settings) {
AngularObjectRegistry registry =
- intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
+ intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
for (AngularObject object : objects) {
conn.send(serializeMessage(
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
.put("interpreterGroupId",
- intpSetting.getInterpreterGroup(user, note.getId()).getId())
+ intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId())
.put("noteId", note.getId()).put("paragraphId", object.getParagraphId())));
}
}
@@ -2354,7 +2349,7 @@ public class NotebookServer extends WebSocketServlet
}
List<String> settingIds =
- notebook.getInterpreterSettingManager().getInterpreters(note.getId());
+ notebook.getInterpreterSettingManager().getInterpreterBinding(note.getId());
for (String id : settingIds) {
if (interpreterGroupId.contains(id)) {
broadcast(note.getId(),
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
deleted file mode 100644
index 1b1306a..0000000
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.mock;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-public class MockInterpreter1 extends Interpreter{
- Map<String, Object> vars = new HashMap<>();
-
- public MockInterpreter1(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- }
-
- @Override
- public FormType getFormType() {
- return FormType.SIMPLE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index a7907db..e2f171f 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -307,10 +307,9 @@ public abstract class AbstractTestRestApi {
protected static void shutDown() throws Exception {
if (!wasRunning) {
// restart interpreter to stop all interpreter processes
- List<String> settingList = ZeppelinServer.notebook.getInterpreterSettingManager()
- .getDefaultInterpreterSettingList();
- for (String setting : settingList) {
- ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting);
+ List<InterpreterSetting> settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get();
+ for (InterpreterSetting setting : settingList) {
+ ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
}
if (shiroIni != null) {
FileUtils.deleteQuietly(shiroIni);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
index 28541bd..72dd8a7 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
@@ -80,7 +80,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
// then
assertThat(get, isAllowed());
- assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getAvailableInterpreterSettings().size(),
+ assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getInterpreterSettingTemplates().size(),
body.entrySet().size());
get.releaseConnection();
}
@@ -110,7 +110,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
@Test
public void testSettingsCRUD() throws IOException {
// when: call create setting API
- String rawRequest = "{\"name\":\"md2\",\"group\":\"md\"," +
+ String rawRequest = "{\"name\":\"md3\",\"group\":\"md\"," +
"\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", \"type\": \"textarea\"}}," +
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
"\"dependencies\":[]," +
@@ -367,7 +367,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
@Test
public void testGetMetadataInfo() throws IOException {
- String jsonRequest = "{\"name\":\"spark\",\"group\":\"spark\"," +
+ String jsonRequest = "{\"name\":\"spark_new\",\"group\":\"spark\"," +
"\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", \"type\": \"textarea\"}}," +
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
"\"dependencies\":[]," +
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 8da36a6..10d77b2 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.AfterClass;
@@ -95,7 +96,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
}
@Test
- public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException {
+ public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException, InterruptedException {
// create a notebook
Note note1 = notebook.createNote(anonymous);
@@ -104,7 +105,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId());
for (InterpreterSetting setting : settings) {
if (setting.getName().equals("md")) {
- interpreterGroup = setting.getInterpreterGroup("anonymous", "sharedProcess");
+ interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
break;
}
}
@@ -115,6 +116,14 @@ public class NotebookServerTest extends AbstractTestRestApi {
p1.setAuthenticationInfo(anonymous);
note1.run(p1.getId());
+ // wait for paragraph finished
+ while(true) {
+ if (p1.getStatus() == Job.Status.FINISHED) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
// add angularObject
interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties
index b0d1067..8368993 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -33,7 +33,6 @@ log4j.logger.org.apache.hadoop.mapred=WARN
log4j.logger.org.apache.hadoop.hive.ql=WARN
log4j.logger.org.apache.hadoop.hive.metastore=WARN
log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN
-log4j.logger.org.apache.zeppelin.scheduler=WARN
log4j.logger.org.quartz=WARN
log4j.logger.DataNucleus=WARN
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index f00fe93..03cc069 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -17,22 +17,21 @@
package org.apache.zeppelin.conf;
-import java.io.File;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.notebook.repo.GitNotebookRepo;
import org.apache.zeppelin.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* Zeppelin configuration.
*
@@ -528,7 +527,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ConfigurationKeyPredicate predicate) {
Map<String, String> configurations = new HashMap<>();
- for (ZeppelinConfiguration.ConfVars v : ZeppelinConfiguration.ConfVars.values()) {
+ for (ConfVars v : ConfVars.values()) {
String key = v.getVarName();
if (!predicate.apply(key)) {
@@ -653,7 +652,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"),
ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"),
ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false),
- ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", GitNotebookRepo.class.getName()),
+ ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage",
+ "org.apache.zeppelin.notebook.repo.GitNotebookRepo"),
ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
// whether by default note is public or private
ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true),
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
index 17a3529..e5f2e3b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
@@ -16,14 +16,17 @@
*/
package org.apache.zeppelin.helium;
+import com.google.gson.Gson;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.notebook.Paragraph;
-import org.apache.zeppelin.resource.DistributedResourcePool;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
-import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.resource.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,19 +50,22 @@ public class Helium {
private final HeliumBundleFactory bundleFactory;
private final HeliumApplicationFactory applicationFactory;
+ private final InterpreterSettingManager interpreterSettingManager;
public Helium(
String heliumConfPath,
String registryPaths,
File registryCacheDir,
HeliumBundleFactory bundleFactory,
- HeliumApplicationFactory applicationFactory)
+ HeliumApplicationFactory applicationFactory,
+ InterpreterSettingManager interpreterSettingManager)
throws IOException {
this.heliumConfPath = heliumConfPath;
this.registryPaths = registryPaths;
this.registryCacheDir = registryCacheDir;
this.bundleFactory = bundleFactory;
this.applicationFactory = applicationFactory;
+ this.interpreterSettingManager = interpreterSettingManager;
heliumConf = loadConf(heliumConfPath);
allPackages = getAllPackageInfo();
}
@@ -350,7 +356,7 @@ public class Helium {
allResources = resourcePool.getAll();
}
} else {
- allResources = ResourcePoolUtils.getAllResources();
+ allResources = interpreterSettingManager.getAllResources();
}
for (List<HeliumPackageSearchResult> pkgs : allPackages.values()) {
@@ -478,4 +484,40 @@ public class Helium {
return mixed;
}
+
+ public ResourceSet getAllResources() {
+ return getAllResourcesExcept(null);
+ }
+
+ private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
+ ResourceSet resourceSet = new ResourceSet();
+ for (ManagedInterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) {
+ if (interpreterGroupExcludsion != null &&
+ intpGroup.getId().equals(interpreterGroupExcludsion)) {
+ continue;
+ }
+
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ } else if (remoteInterpreterProcess.isRunning()) {
+ List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+ @Override
+ public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+ return client.resourcePoolGetAll();
+ }
+ }
+ );
+ Gson gson = new Gson();
+ for (String res : resourceList) {
+ resourceSet.add(gson.fromJson(res, Resource.class));
+ }
+ }
+ }
+ return resourceSet;
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
index 84368a7..f4dfae3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
@@ -16,7 +16,6 @@
*/
package org.apache.zeppelin.helium;
-import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
@@ -80,7 +79,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
try {
// get interpreter process
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
- InterpreterGroup intpGroup = intp.getInterpreterGroup();
+ ManagedInterpreterGroup intpGroup = (ManagedInterpreterGroup) intp.getInterpreterGroup();
RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
@@ -105,38 +104,33 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState)
throws Exception {
- RemoteInterpreterService.Client client = null;
-
synchronized (appState) {
if (appState.getStatus() == ApplicationState.Status.LOADED) {
// already loaded
return;
}
- try {
- appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING);
- String pkgInfo = pkg.toJson();
- String appId = appState.getId();
-
- client = intpProcess.getClient();
- RemoteApplicationResult ret = client.loadApplication(
- appId,
- pkgInfo,
- paragraph.getNote().getId(),
- paragraph.getId());
-
- if (ret.isSuccess()) {
- appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED);
- } else {
- throw new ApplicationException(ret.getMsg());
- }
- } catch (TException e) {
- intpProcess.releaseBrokenClient(client);
- throw e;
- } finally {
- if (client != null) {
- intpProcess.releaseClient(client);
- }
+ appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING);
+ final String pkgInfo = pkg.toJson();
+ final String appId = appState.getId();
+
+ RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+ @Override
+ public RemoteApplicationResult call(RemoteInterpreterService.Client client)
+ throws Exception {
+ return client.loadApplication(
+ appId,
+ pkgInfo,
+ paragraph.getNote().getId(),
+ paragraph.getId());
+ }
+ }
+ );
+ if (ret.isSuccess()) {
+ appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED);
+ } else {
+ throw new ApplicationException(ret.getMsg());
}
}
}
@@ -199,7 +193,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
}
- private void unload(ApplicationState appsToUnload) throws ApplicationException {
+ private void unload(final ApplicationState appsToUnload) throws ApplicationException {
synchronized (appsToUnload) {
if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) {
throw new ApplicationException(
@@ -212,31 +206,24 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
RemoteInterpreterProcess intpProcess =
- intp.getInterpreterGroup().getRemoteInterpreterProcess();
+ ((ManagedInterpreterGroup) intp.getInterpreterGroup()).getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
- RemoteInterpreterService.Client client;
- try {
- client = intpProcess.getClient();
- } catch (Exception e) {
- throw new ApplicationException(e);
- }
-
- try {
- RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
-
- if (ret.isSuccess()) {
- appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED);
- } else {
- throw new ApplicationException(ret.getMsg());
- }
- } catch (TException e) {
- intpProcess.releaseBrokenClient(client);
- throw new ApplicationException(e);
- } finally {
- intpProcess.releaseClient(client);
+ RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+ @Override
+ public RemoteApplicationResult call(RemoteInterpreterService.Client client)
+ throws Exception {
+ return client.unloadApplication(appsToUnload.getId());
+ }
+ }
+ );
+ if (ret.isSuccess()) {
+ appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED);
+ } else {
+ throw new ApplicationException(ret.getMsg());
}
}
}
@@ -286,7 +273,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
}
- private void run(ApplicationState app) throws ApplicationException {
+ private void run(final ApplicationState app) throws ApplicationException {
synchronized (app) {
if (app.getStatus() != ApplicationState.Status.LOADED) {
throw new ApplicationException(
@@ -299,33 +286,23 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
RemoteInterpreterProcess intpProcess =
- intp.getInterpreterGroup().getRemoteInterpreterProcess();
+ ((ManagedInterpreterGroup) intp.getInterpreterGroup()).getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
- RemoteInterpreterService.Client client = null;
- try {
- client = intpProcess.getClient();
- } catch (Exception e) {
- throw new ApplicationException(e);
- }
-
- try {
- RemoteApplicationResult ret = client.runApplication(app.getId());
-
- if (ret.isSuccess()) {
- // success
- } else {
- throw new ApplicationException(ret.getMsg());
- }
- } catch (TException e) {
- intpProcess.releaseBrokenClient(client);
- client = null;
- throw new ApplicationException(e);
- } finally {
- if (client != null) {
- intpProcess.releaseClient(client);
- }
+ RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+ @Override
+ public RemoteApplicationResult call(RemoteInterpreterService.Client client)
+ throws Exception {
+ return client.runApplication(app.getId());
+ }
+ }
+ );
+ if (ret.isSuccess()) {
+ // success
+ } else {
+ throw new ApplicationException(ret.getMsg());
}
}
}