You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/04/12 07:58:42 UTC
[1/3] incubator-zeppelin git commit: ZEPPELIN-25 Ability to create
rich gui inside of Notebook
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master c0a7d08c5 -> 58b70e3bc
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
new file mode 100644
index 0000000..31b534e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ *
+ */
+public class ExecutorFactory {
+ private static ExecutorFactory _executor;
+ private static Long _executorLock = new Long(0);
+
+ Map<String, ExecutorService> executor = new HashMap<String, ExecutorService>();
+
+ public ExecutorFactory() {
+
+ }
+
+ public static ExecutorFactory singleton() {
+ if (_executor == null) {
+ synchronized (_executorLock) {
+ if (_executor == null) {
+ _executor = new ExecutorFactory();
+ }
+ }
+ }
+ return _executor;
+ }
+
+ public ExecutorService getDefaultExecutor() {
+ return createOrGet("default");
+ }
+
+ public ExecutorService createOrGet(String name) {
+ return createOrGet(name, 100);
+ }
+
+ public ExecutorService createOrGet(String name, int numThread) {
+ synchronized (executor) {
+ if (!executor.containsKey(name)) {
+ executor.put(name, Executors.newScheduledThreadPool(numThread));
+ }
+ return executor.get(name);
+ }
+ }
+
+ public void shutdown(String name) {
+ synchronized (executor) {
+ if (executor.containsKey(name)) {
+ ExecutorService e = executor.get(name);
+ e.shutdown();
+ executor.remove(name);
+ }
+ }
+ }
+
+
+ public void shutdownAll() {
+ synchronized (executor) {
+ for (String name : executor.keySet()){
+ shutdown(name);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 2556a81..71769b4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -22,8 +22,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.slf4j.Logger;
@@ -37,7 +36,7 @@ import org.slf4j.LoggerFactory;
*/
public class SchedulerFactory implements SchedulerListener {
private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
- ScheduledExecutorService executor;
+ ExecutorService executor;
Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>();
private static SchedulerFactory singleton;
@@ -59,11 +58,11 @@ public class SchedulerFactory implements SchedulerListener {
}
public SchedulerFactory() throws Exception {
- executor = Executors.newScheduledThreadPool(100);
+ executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
}
public void destroy() {
- executor.shutdown();
+ ExecutorFactory.singleton().shutdown("schedulerFactory");
}
public Scheduler createOrGetFIFOScheduler(String name) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 051730e..f9cd181 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -24,7 +24,8 @@ struct RemoteInterpreterContext {
2: string paragraphTitle,
3: string paragraphText,
4: string config, // json serialized config
- 5: string gui // json serialized gui
+ 5: string gui, // json serialized gui
+ 6: string runners // json serialized runner
}
struct RemoteInterpreterResult {
@@ -35,6 +36,19 @@ struct RemoteInterpreterResult {
5: string gui // json serialized gui
}
+enum RemoteInterpreterEventType {
+ NO_OP = 1,
+ ANGULAR_OBJECT_ADD = 2,
+ ANGULAR_OBJECT_UPDATE = 3,
+ ANGULAR_OBJECT_REMOVE = 4,
+ RUN_INTERPRETER_CONTEXT_RUNNER = 5
+}
+
+struct RemoteInterpreterEvent {
+ 1: RemoteInterpreterEventType type,
+ 2: string data // json serialized data
+}
+
service RemoteInterpreterService {
void createInterpreter(1: string className, 2: map<string, string> properties);
@@ -48,4 +62,7 @@ service RemoteInterpreterService {
void shutdown();
string getStatus(1:string jobId);
+
+ RemoteInterpreterEvent getEvent();
+ void angularObjectUpdate(1: string name, 2: string object);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
new file mode 100644
index 0000000..b0ed45f
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+public class AngularObjectRegistryTest {
+
+ @Test
+ public void testBasic() {
+ final AtomicInteger onAdd = new AtomicInteger(0);
+ final AtomicInteger onUpdate = new AtomicInteger(0);
+ final AtomicInteger onRemove = new AtomicInteger(0);
+
+ AngularObjectRegistry registry = new AngularObjectRegistry("intpId",
+ new AngularObjectRegistryListener() {
+
+ @Override
+ public void onAdd(String interpreterGroupId, AngularObject object) {
+ onAdd.incrementAndGet();
+ }
+
+ @Override
+ public void onUpdate(String interpreterGroupId, AngularObject object) {
+ onUpdate.incrementAndGet();
+ }
+
+ @Override
+ public void onRemove(String interpreterGroupId, AngularObject object) {
+ onRemove.incrementAndGet();
+ }
+ });
+
+ registry.add("name1", "value1");
+ assertEquals(1, registry.getAll().size());
+ assertEquals(1, onAdd.get());
+ assertEquals(0, onUpdate.get());
+
+ registry.get("name1").set("newValue");
+ assertEquals(1, onUpdate.get());
+
+ registry.remove("name1");
+ assertEquals(0, registry.getAll().size());
+ assertEquals(1, onRemove.get());
+
+ assertEquals(null, registry.get("name1"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
new file mode 100644
index 0000000..7ccc934
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.junit.Test;
+
+public class AngularObjectTest {
+
+ @Test
+ public void testListener() {
+ final AtomicInteger updated = new AtomicInteger(0);
+ AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() {
+
+ @Override
+ public void updated(AngularObject updatedObject) {
+ updated.incrementAndGet();
+ }
+
+ });
+
+ assertEquals(0, updated.get());
+ ao.set("newValue");
+ assertEquals(1, updated.get());
+ assertEquals("newValue", ao.get());
+
+ ao.set("newValue");
+ assertEquals(2, updated.get());
+
+ ao.set("newnewValue", false);
+ assertEquals(2, updated.get());
+ assertEquals("newnewValue", ao.get());
+ }
+
+ @Test
+ public void testWatcher() throws InterruptedException {
+ final AtomicInteger updated = new AtomicInteger(0);
+ final AtomicInteger onWatch = new AtomicInteger(0);
+ AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() {
+ @Override
+ public void updated(AngularObject updatedObject) {
+ updated.incrementAndGet();
+ }
+ });
+
+ ao.addWatcher(new AngularObjectWatcher(null) {
+ @Override
+ public void watch(Object oldObject, Object newObject, InterpreterContext context) {
+ onWatch.incrementAndGet();
+ }
+ });
+
+ assertEquals(0, onWatch.get());
+ ao.set("newValue");
+
+ Thread.sleep(500);
+ assertEquals(1, onWatch.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
new file mode 100644
index 0000000..d4909e3
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
+ private InterpreterGroup intpGroup;
+ private HashMap<String, String> env;
+ private RemoteInterpreter intp;
+ private InterpreterContext context;
+ private RemoteAngularObjectRegistry localRegistry;
+
+ private AtomicInteger onAdd;
+ private AtomicInteger onUpdate;
+ private AtomicInteger onRemove;
+
+ @Before
+ public void setUp() throws Exception {
+ onAdd = new AtomicInteger(0);
+ onUpdate = new AtomicInteger(0);
+ onRemove = new AtomicInteger(0);
+
+ intpGroup = new InterpreterGroup("intpId");
+ localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
+ intpGroup.setAngularObjectRegistry(localRegistry);
+ env = new HashMap<String, String>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+ Properties p = new Properties();
+
+ intp = new RemoteInterpreter(
+ p,
+ MockInterpreterAngular.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env
+ );
+
+ intpGroup.add(intp);
+ intp.setInterpreterGroup(intpGroup);
+
+ context = new InterpreterContext(
+ "id",
+ "title",
+ "text",
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
+
+ intp.open();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ intp.close();
+ intpGroup.clone();
+ intpGroup.destroy();
+ }
+
+ @Test
+ public void testAngularObjectCRUD() throws InterruptedException {
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().split(" ");
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+
+ // create object
+ ret = intp.interpret("add n1 v1", context);
+ Thread.sleep(500);
+ result = ret.message().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+ assertEquals("v1", localRegistry.get("n1").get());
+
+ // update object
+ ret = intp.interpret("update n1 v11", context);
+ result = ret.message().split(" ");
+ Thread.sleep(500);
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals("v11", localRegistry.get("n1").get());
+
+ // remove object
+ ret = intp.interpret("remove n1", context);
+ result = ret.message().split(" ");
+ Thread.sleep(500);
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals(null, localRegistry.get("n1"));
+ }
+
+ @Override
+ public void onAdd(String interpreterGroupId, AngularObject object) {
+ onAdd.incrementAndGet();
+ }
+
+ @Override
+ public void onUpdate(String interpreterGroupId, AngularObject object) {
+ onUpdate.incrementAndGet();
+ }
+
+ @Override
+ public void onRemove(String interpreterGroupId, AngularObject object) {
+ onRemove.incrementAndGet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
index 02dc224..fcd6847 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertFalse;
import java.util.HashMap;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.junit.Test;
@@ -30,11 +30,13 @@ public class RemoteInterpreterProcessTest {
@Test
public void testStartStop() {
- RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
+ new InterpreterContextRunnerPool());
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
- assertEquals(1, rip.reference());
- assertEquals(2, rip.reference());
+ assertEquals(1, rip.reference(intpGroup));
+ assertEquals(2, rip.reference(intpGroup));
assertEquals(true, rip.isRunning());
assertEquals(1, rip.dereference());
assertEquals(true, rip.isRunning());
@@ -44,8 +46,10 @@ public class RemoteInterpreterProcessTest {
@Test
public void testClientFactory() throws Exception {
- RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
- rip.reference();
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
+ new InterpreterContextRunnerPool());
+ rip.reference(intpGroup);
assertEquals(0, rip.getNumActiveClient());
assertEquals(0, rip.getNumIdleClient());
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 58299bb..e743eab 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -30,12 +30,12 @@ import java.util.Map;
import java.util.Properties;
import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
import org.apache.zeppelin.scheduler.Job;
@@ -109,7 +109,9 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
intpB.open();
assertEquals(2, process.referenceCount());
@@ -159,7 +161,9 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
assertEquals("500", ret.message());
ret = intpB.interpret("500",
@@ -168,7 +172,9 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
assertEquals("1000", ret.message());
long end = System.currentTimeMillis();
assertTrue(end - start >= 1000);
@@ -231,7 +237,9 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
}
@Override
@@ -262,7 +270,9 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
}
@Override
@@ -333,7 +343,9 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
synchronized (results) {
results.add(ret.message());
@@ -413,7 +425,9 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
synchronized (results) {
results.add(ret.message());
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
new file mode 100644
index 0000000..ff1b8ed
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+
+public class MockInterpreterAngular extends Interpreter {
+ static {
+ Interpreter.register(
+ "angularTest",
+ "angular",
+ MockInterpreterA.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("p1", "v1", "property1").build());
+
+ }
+
+ AtomicInteger numWatch = new AtomicInteger(0);
+
+ public MockInterpreterAngular(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] stmt = st.split(" ");
+ String cmd = stmt[0];
+ String name = null;
+ if (stmt.length >= 2) {
+ name = stmt[1];
+ }
+ String value = null;
+ if (stmt.length == 3) {
+ value = stmt[2];
+ }
+
+ AngularObjectRegistry registry = context.getAngularObjectRegistry();
+
+ if (cmd.equals("add")) {
+ registry.add(name, value);
+ registry.get(name).addWatcher(new AngularObjectWatcher(null) {
+
+ @Override
+ public void watch(Object oldObject, Object newObject,
+ InterpreterContext context) {
+ numWatch.incrementAndGet();
+ }
+
+ });
+ } else if (cmd.equalsIgnoreCase("update")) {
+ registry.get(name).set(value);
+ } else if (cmd.equals("remove")) {
+ registry.remove(name);
+ }
+
+ try {
+ Thread.sleep(500); // wait for watcher executed
+ } catch (InterruptedException e) {
+ }
+
+ String msg = registry.getAll().size() + " " + Integer.toString(numWatch.get());
+ return new InterpreterResult(Code.SUCCESS, msg);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index 2c13ab2..bb0fb80 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -21,17 +21,17 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,7 +53,7 @@ public class RemoteSchedulerTest {
@Test
public void test() throws Exception {
Properties p = new Properties();
- InterpreterGroup intpGroup = new InterpreterGroup();
+ final InterpreterGroup intpGroup = new InterpreterGroup();
Map<String, String> env = new HashMap<String, String>();
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
@@ -93,7 +93,9 @@ public class RemoteSchedulerTest {
"title",
"text",
new HashMap<String, Object>(),
- new GUI()));
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>()));
return "1000";
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 909cc8f..bd55b2d 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -298,7 +298,7 @@ public class ZeppelinServer extends Application {
this.schedulerFactory = new SchedulerFactory();
- this.replFactory = new InterpreterFactory(conf);
+ this.replFactory = new InterpreterFactory(conf, notebookServer);
notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer);
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
index a7b8b66..e4626bf 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
@@ -90,6 +90,11 @@ public class Message {
// @param notes serialized List<NoteInfo> object
PARAGRAPH_REMOVE,
+
+ ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object
+ ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del
+
+ ANGULAR_OBJECT_UPDATED // [c-s] angular object value updated
}
public OP op;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 415e8c1..69d62d8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -25,7 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
@@ -50,7 +54,8 @@ import com.google.gson.Gson;
*
* @author anthonycorbacho
*/
-public class NotebookServer extends WebSocketServer implements JobListenerFactory {
+public class NotebookServer extends WebSocketServer implements
+ JobListenerFactory, AngularObjectRegistryListener {
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
private static final String DEFAULT_ADDR = "0.0.0.0";
@@ -131,6 +136,9 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
case COMPLETION:
completion(conn, notebook, messagereceived);
break;
+ case ANGULAR_OBJECT_UPDATED:
+ angularObjectUpdated(conn, notebook, messagereceived);
+ break;
default:
broadcastNoteList();
break;
@@ -220,13 +228,28 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
return id;
}
+ private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) {
+ Notebook notebook = notebook();
+ List<Note> notes = notebook.getAllNotes();
+ for (Note note : notes) {
+ List<String> ids = note.getNoteReplLoader().getInterpreters();
+ for (String id : ids) {
+ if (id.equals(interpreterGroupId)) {
+ broadcast(note.id(), m);
+ }
+ }
+ }
+ }
+
private void broadcast(String noteId, Message m) {
- LOG.info("SEND >> " + m.op);
synchronized (noteSocketMap) {
List<WebSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
}
+
+ LOG.info("SEND >> " + m.op);
+
for (WebSocket conn : socketLists) {
conn.send(serializeMessage(m));
}
@@ -264,9 +287,11 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
return;
}
Note note = notebook.getNote(noteId);
+
if (note != null) {
addConnectionToNote(note.id(), conn);
conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+ sendAllAngularObjects(note, conn);
}
}
@@ -381,6 +406,66 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
conn.send(serializeMessage(resp));
}
+ /**
+ * When angular object updated from client
+ * @param conn
+ * @param notebook
+ * @param fromMessage
+ */
+ private void angularObjectUpdated(WebSocket conn, Notebook notebook,
+ Message fromMessage) {
+ String noteId = (String) fromMessage.get("noteId");
+ String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
+ String varName = (String) fromMessage.get("name");
+ Object varValue = fromMessage.get("value");
+
+ // propagate change to (Remote) AngularObjectRegistry
+ Note note = notebook.getNote(noteId);
+ if (note != null) {
+ List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
+ for (InterpreterSetting setting : settings) {
+ if (setting.getInterpreterGroup() == null) {
+ continue;
+ }
+
+ if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
+ AngularObjectRegistry angularObjectRegistry = setting
+ .getInterpreterGroup().getAngularObjectRegistry();
+ AngularObject ao = angularObjectRegistry.get(varName);
+ if (ao == null) {
+ LOG.warn("Object {} is not binded", varName);
+ } else {
+ // path from client -> server
+ ao.set(varValue, false);
+ }
+
+ break;
+ }
+ }
+ }
+
+ // broadcast change to all web session that uses related interpreter.
+ for (Note n : notebook.getAllNotes()) {
+ List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
+ for (InterpreterSetting setting : settings) {
+ if (setting.getInterpreterGroup() == null) {
+ continue;
+ }
+
+ if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
+ AngularObjectRegistry angularObjectRegistry = setting
+ .getInterpreterGroup().getAngularObjectRegistry();
+ AngularObject ao = angularObjectRegistry.get(varName);
+ this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", ao)
+ .put("interpreterGroupId", interpreterGroupId)
+ .put("noteId", n.id()));
+ }
+ }
+ }
+ }
+
+
private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
throws IOException {
final String paragraphId = (String) fromMessage.get("id");
@@ -498,4 +583,66 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
public JobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(this, note);
}
+
+ private void sendAllAngularObjects(Note note, WebSocket conn) {
+ List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
+ if (settings == null || settings.size() == 0) {
+ return;
+ }
+
+ for (InterpreterSetting intpSetting : settings) {
+ AngularObjectRegistry registry = intpSetting.getInterpreterGroup().getAngularObjectRegistry();
+ List<AngularObject> objects = registry.getAll();
+ for (AngularObject object : objects) {
+ conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", object)
+ .put("interpreterGroupId", intpSetting.getInterpreterGroup().getId())
+ .put("noteId", note.id())));
+ }
+ }
+ }
+
+ @Override
+ public void onAdd(String interpreterGroupId, AngularObject object) {
+ onUpdate(interpreterGroupId, object);
+ }
+
+ @Override
+ public void onUpdate(String interpreterGroupId, AngularObject object) {
+ Notebook notebook = notebook();
+
+ List<Note> notes = notebook.getAllNotes();
+ for (Note note : notes) {
+ List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
+ .getInterpreterSettings();
+
+ if (intpSettings.isEmpty()) continue;
+
+ for (InterpreterSetting setting : intpSettings) {
+ if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
+ broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", object)
+ .put("interpreterGroupId", interpreterGroupId)
+ .put("noteId", note.id()));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onRemove(String interpreterGroupId, AngularObject object) {
+ Notebook notebook = notebook();
+ List<Note> notes = notebook.getAllNotes();
+ for (Note note : notes) {
+ List<String> ids = note.getNoteReplLoader().getInterpreters();
+ for (String id : ids) {
+ if (id.equals(interpreterGroupId)) {
+ broadcast(
+ note.id(),
+ new Message(OP.ANGULAR_OBJECT_REMOVE).put("name",
+ object.getName()));
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index 3e63503..a3bf289 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -79,7 +79,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
assertThat(get, isAllowed());
Map<String, Object> resp = gson.fromJson(get.getResponseBodyAsString(), new TypeToken<Map<String, Object>>(){}.getType());
Map<String, Object> body = (Map<String, Object>) resp.get("body");
- assertEquals(6, body.size());
+ assertEquals(7, body.size());
get.releaseConnection();
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/scripts/controllers/main.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/app/scripts/controllers/main.js b/zeppelin-web/app/scripts/controllers/main.js
index 535cf78..4948b49 100644
--- a/zeppelin-web/app/scripts/controllers/main.js
+++ b/zeppelin-web/app/scripts/controllers/main.js
@@ -24,7 +24,7 @@
*/
angular.module('zeppelinWebApp')
.controller('MainCtrl', function($scope, WebSocket, $rootScope, $window) {
-
+ $rootScope.compiledScope = $scope.$new(true, $rootScope);
$scope.WebSocketWaitingList = [];
$scope.connected = false;
$scope.looknfeel = 'default';
@@ -65,6 +65,8 @@ angular.module('zeppelinWebApp')
$scope.$broadcast('updateProgress', data);
} else if (op === 'COMPLETION_LIST') {
$scope.$broadcast('completionList', data);
+ } else if (op === 'ANGULAR_OBJECT_UPDATE') {
+ $scope.$broadcast('angularObjectUpdate', data);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/scripts/controllers/notebook.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/app/scripts/controllers/notebook.js b/zeppelin-web/app/scripts/controllers/notebook.js
index cc295dc..9a9fcd7 100644
--- a/zeppelin-web/app/scripts/controllers/notebook.js
+++ b/zeppelin-web/app/scripts/controllers/notebook.js
@@ -44,6 +44,8 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
$scope.interpreterSettings = [];
$scope.interpreterBindings = [];
+ var angularObjectRegistry = {};
+
$scope.getCronOptionNameFromValue = function(value) {
if (!value) {
return '';
@@ -442,4 +444,51 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
return true;
}
};
+
+ $scope.$on('angularObjectUpdate', function(event, data) {
+ if (data.noteId === $scope.note.id) {
+ var scope = $rootScope.compiledScope;
+ var varName = data.angularObject.name;
+
+ if (angular.equals(data.angularObject.object, scope[varName])) {
+ // return when update has no change
+ return;
+ }
+
+ if (!angularObjectRegistry[varName]) {
+ angularObjectRegistry[varName] = {
+ interpreterGroupId : data.interpreterGroupId,
+ }
+ }
+
+ angularObjectRegistry[varName].skipEmit = true;
+
+ if (!angularObjectRegistry[varName].clearWatcher) {
+ angularObjectRegistry[varName].clearWatcher = scope.$watch(varName, function(newValue, oldValue) {
+ if (angularObjectRegistry[varName].skipEmit) {
+ angularObjectRegistry[varName].skipEmit = false;
+ return;
+ }
+
+ $rootScope.$emit('sendNewEvent', {
+ op: 'ANGULAR_OBJECT_UPDATED',
+ data: {
+ noteId: $routeParams.noteId,
+ name:varName,
+ value:newValue,
+ interpreterGroupId:angularObjectRegistry[varName].interpreterGroupId
+ }
+ });
+ });
+ }
+ scope[varName] = data.angularObject.object;
+ }
+
+ });
+
+ var isFunction = function(functionToCheck) {
+ var getType = {};
+ return functionToCheck && getType.toString.call(functionToCheck) === '[object Function]';
+ }
+
});
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/scripts/controllers/paragraph.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/app/scripts/controllers/paragraph.js b/zeppelin-web/app/scripts/controllers/paragraph.js
index aac8a78..3b1d05f 100644
--- a/zeppelin-web/app/scripts/controllers/paragraph.js
+++ b/zeppelin-web/app/scripts/controllers/paragraph.js
@@ -25,7 +25,7 @@
* @author anthonycorbacho
*/
angular.module('zeppelinWebApp')
- .controller('ParagraphCtrl', function($scope, $rootScope, $route, $window, $element, $routeParams, $location, $timeout) {
+ .controller('ParagraphCtrl', function($scope, $rootScope, $route, $window, $element, $routeParams, $location, $timeout, $compile) {
$scope.paragraph = null;
$scope.editor = null;
@@ -56,6 +56,8 @@ angular.module('zeppelinWebApp')
$scope.setGraphMode($scope.getGraphMode(), false, false);
} else if ($scope.getResultType() === 'HTML') {
$scope.renderHtml();
+ } else if ($scope.getResultType() === 'ANGULAR') {
+ $scope.renderAngular();
}
};
@@ -77,6 +79,25 @@ angular.module('zeppelinWebApp')
};
+ $scope.renderAngular = function() {
+ var retryRenderer = function() {
+ if (angular.element('#p'+$scope.paragraph.id+'_angular').length) {
+ try {
+ angular.element('#p'+$scope.paragraph.id+'_angular').html($scope.paragraph.result.msg);
+
+ $compile(angular.element('#p'+$scope.paragraph.id+'_angular').contents())($rootScope.compiledScope);
+ } catch(err) {
+ console.log('ANGULAR rendering error %o', err);
+ }
+ } else {
+ $timeout(retryRenderer,10);
+ }
+ };
+ $timeout(retryRenderer);
+
+ };
+
+
var initializeDefault = function() {
var config = $scope.paragraph.config;
@@ -210,6 +231,8 @@ angular.module('zeppelinWebApp')
}
} else if (newType === 'HTML') {
$scope.renderHtml();
+ } else if (newType === 'ANGULAR') {
+ $scope.renderAngular();
}
}
});
@@ -1593,5 +1616,4 @@ angular.module('zeppelinWebApp')
var redirectToUrl = location.protocol + '//' + location.host + '/#/notebook/' + noteId + '/paragraph/' + $scope.paragraph.id+'?asIframe';
$window.open(redirectToUrl);
};
-
});
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/views/paragraph.html
----------------------------------------------------------------------
diff --git a/zeppelin-web/app/views/paragraph.html b/zeppelin-web/app/views/paragraph.html
index c77c85a..ef4daaa 100644
--- a/zeppelin-web/app/views/paragraph.html
+++ b/zeppelin-web/app/views/paragraph.html
@@ -353,6 +353,11 @@ limitations under the License.
ng-Init="loadResultType(paragraph.result)">
</div>
+ <div id="p{{paragraph.id}}_angular"
+ ng-if="paragraph.result.type == 'ANGULAR'"
+ ng-Init="loadResultType(paragraph.result)">
+ </div>
+
<img id="{{paragraph.id}}_img"
ng-if="paragraph.result.type == 'IMG'"
ng-Init="loadResultType(paragraph.result)"
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 4b0d96c..c6c3b82 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -385,6 +385,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.spark.SparkSqlInterpreter,"
+ "org.apache.zeppelin.spark.DepInterpreter,"
+ "org.apache.zeppelin.markdown.Markdown,"
+ + "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 7c81e90..c8fc485 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -43,7 +43,10 @@ import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,15 +76,21 @@ public class InterpreterFactory {
private InterpreterOption defaultOption;
- public InterpreterFactory(ZeppelinConfiguration conf) throws InterpreterException, IOException {
- this(conf, new InterpreterOption(true));
+ AngularObjectRegistryListener angularObjectRegistryListener;
+
+ public InterpreterFactory(ZeppelinConfiguration conf,
+ AngularObjectRegistryListener angularObjectRegistryListener)
+ throws InterpreterException, IOException {
+ this(conf, new InterpreterOption(true), angularObjectRegistryListener);
}
- public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption)
+ public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
+ AngularObjectRegistryListener angularObjectRegistryListener)
throws InterpreterException, IOException {
this.conf = conf;
this.defaultOption = defaultOption;
+ this.angularObjectRegistryListener = angularObjectRegistryListener;
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
interpreterClassList = replsConf.split(",");
@@ -217,17 +226,20 @@ public class InterpreterFactory {
// previously created setting should turn this feature on here.
setting.getOption().setRemote(true);
- InterpreterGroup interpreterGroup = createInterpreterGroup(
- setting.getGroup(),
- setting.getOption(),
- setting.getProperties());
+
InterpreterSetting intpSetting = new InterpreterSetting(
setting.id(),
setting.getName(),
setting.getGroup(),
+ setting.getOption());
+
+ InterpreterGroup interpreterGroup = createInterpreterGroup(
+ setting.id(),
+ setting.getGroup(),
setting.getOption(),
- interpreterGroup);
+ setting.getProperties());
+ intpSetting.setInterpreterGroup(interpreterGroup);
interpreterSettings.put(k, intpSetting);
}
@@ -320,25 +332,46 @@ public class InterpreterFactory {
InterpreterOption option, Properties properties)
throws InterpreterException, IOException {
synchronized (interpreterSettings) {
- InterpreterGroup interpreterGroup = createInterpreterGroup(groupName, option, properties);
InterpreterSetting intpSetting = new InterpreterSetting(
name,
groupName,
- option,
- interpreterGroup);
- interpreterSettings.put(intpSetting.id(), intpSetting);
+ option);
+
+ InterpreterGroup interpreterGroup = createInterpreterGroup(
+ intpSetting.id(), groupName, option, properties);
+ intpSetting.setInterpreterGroup(interpreterGroup);
+
+ interpreterSettings.put(intpSetting.id(), intpSetting);
saveToFile();
return interpreterGroup;
}
}
- private InterpreterGroup createInterpreterGroup(String groupName,
+ private InterpreterGroup createInterpreterGroup(String id,
+ String groupName,
InterpreterOption option,
Properties properties)
throws InterpreterException {
- InterpreterGroup interpreterGroup = new InterpreterGroup();
+
+ AngularObjectRegistry angularObjectRegistry;
+
+ InterpreterGroup interpreterGroup = new InterpreterGroup(id);
+ if (option.isRemote()) {
+ angularObjectRegistry = new RemoteAngularObjectRegistry(
+ id,
+ angularObjectRegistryListener,
+ interpreterGroup
+ );
+ } else {
+ angularObjectRegistry = new AngularObjectRegistry(
+ id,
+ angularObjectRegistryListener);
+ }
+
+ interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+
for (String className : interpreterClassList) {
Set<String> keys = Interpreter.registeredInterpreters.keySet();
@@ -480,6 +513,7 @@ public class InterpreterFactory {
intpsetting.setOption(option);
InterpreterGroup interpreterGroup = createInterpreterGroup(
+ intpsetting.id(),
intpsetting.getGroup(), option, properties);
intpsetting.setInterpreterGroup(interpreterGroup);
saveToFile();
@@ -499,6 +533,7 @@ public class InterpreterFactory {
intpsetting.getInterpreterGroup().destroy();
InterpreterGroup interpreterGroup = createInterpreterGroup(
+ intpsetting.id(),
intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties());
intpsetting.setInterpreterGroup(interpreterGroup);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 04785aa..301ed23 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -36,21 +36,17 @@ public class InterpreterSetting {
public InterpreterSetting(String id, String name,
String group,
- InterpreterOption option,
- InterpreterGroup interpreterGroup) {
+ InterpreterOption option) {
this.id = id;
this.name = name;
this.group = group;
- this.properties = interpreterGroup.getProperty();
this.option = option;
- this.interpreterGroup = interpreterGroup;
}
public InterpreterSetting(String name,
String group,
- InterpreterOption option,
- InterpreterGroup interpreterGroup) {
- this(generateId(), name, group, option, interpreterGroup);
+ InterpreterOption option) {
+ this(generateId(), name, group, option);
}
public String id() {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 9204a07..b5e68a4 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -28,17 +28,21 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +58,8 @@ public class Note implements Serializable, JobListener {
private String name;
private String id;
+ Map<String, List<AngularObject>> angularObjects = new HashMap<String, List<AngularObject>>();
+
private transient NoteInterpreterLoader replLoader;
private transient ZeppelinConfiguration conf;
private transient JobListenerFactory jobListenerFactory;
@@ -110,13 +116,17 @@ public class Note implements Serializable, JobListener {
this.conf = conf;
}
+ public Map<String, List<AngularObject>> getAngularObjects() {
+ return angularObjects;
+ }
+
/**
* Add paragraph last.
*
* @param p
*/
public Paragraph addParagraph() {
- Paragraph p = new Paragraph(this, replLoader);
+ Paragraph p = new Paragraph(this, this, replLoader);
synchronized (paragraphs) {
paragraphs.add(p);
}
@@ -130,7 +140,7 @@ public class Note implements Serializable, JobListener {
* @param p
*/
public Paragraph insertParagraph(int index) {
- Paragraph p = new Paragraph(this, replLoader);
+ Paragraph p = new Paragraph(this, this, replLoader);
synchronized (paragraphs) {
paragraphs.add(index, p);
}
@@ -268,6 +278,21 @@ public class Note implements Serializable, JobListener {
}
}
+ private void snapshotAngularObjectRegistry() {
+ angularObjects = new HashMap<String, List<AngularObject>>();
+
+ List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
+ if (settings == null || settings.size() == 0) {
+ return;
+ }
+
+ for (InterpreterSetting setting : settings) {
+ InterpreterGroup intpGroup = setting.getInterpreterGroup();
+ AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
+ angularObjects.put(intpGroup.getId(), registry.getAll());
+ }
+ }
+
public void persist() throws IOException {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.setPrettyPrinting();
@@ -283,6 +308,7 @@ public class Note implements Serializable, JobListener {
File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
logger().info("Persist note {} into {}", id, file.getAbsolutePath());
+ snapshotAngularObjectRegistry();
String json = gson.toJson(this);
FileOutputStream out = new FileOutputStream(file);
out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)));
@@ -316,11 +342,12 @@ public class Note implements Serializable, JobListener {
note.setReplLoader(replLoader);
note.jobListenerFactory = jobListenerFactory;
for (Paragraph p : note.paragraphs) {
+ p.setNote(note);
+
if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
p.setStatus(Status.ABORT);
}
}
-
return note;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 2d9ba36..844763f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -29,7 +31,10 @@ import java.util.Map;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@@ -160,6 +165,10 @@ public class Notebook {
if (dirs == null) {
return;
}
+
+ Map<String, SnapshotAngularObject> angularObjectSnapshot =
+ new HashMap<String, SnapshotAngularObject>();
+
for (File f : dirs) {
boolean isHidden = f.getName().startsWith(".");
if (f.isDirectory() && !isHidden) {
@@ -174,18 +183,84 @@ public class Notebook {
jobListenerFactory, quartzSched);
noteInterpreterLoader.setNoteId(note.id());
+ // restore angular object --------------
+ Date lastUpdatedDate = new Date(0);
+ for (Paragraph p : note.getParagraphs()) {
+ if (p.getDateFinished() != null &&
+ lastUpdatedDate.before(p.getDateFinished())) {
+ lastUpdatedDate = p.getDateFinished();
+ }
+ }
+
+ Map<String, List<AngularObject>> savedObjects = note.getAngularObjects();
+
+ if (savedObjects != null) {
+ for (String intpGroupName : savedObjects.keySet()) {
+ List<AngularObject> objectList = savedObjects.get(intpGroupName);
+
+ for (AngularObject savedObject : objectList) {
+ SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName());
+ if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) {
+ angularObjectSnapshot.put(
+ savedObject.getName(),
+ new SnapshotAngularObject(
+ intpGroupName,
+ savedObject,
+ lastUpdatedDate));
+ }
+ }
+ }
+ }
+
synchronized (notes) {
notes.put(note.id(), note);
refreshCron(note.id());
}
}
}
+
+ for (String name : angularObjectSnapshot.keySet()) {
+ SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
+ List<InterpreterSetting> settings = replFactory.get();
+ for (InterpreterSetting setting : settings) {
+ InterpreterGroup intpGroup = setting.getInterpreterGroup();
+ if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
+ AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
+ if (registry.get(name) == null) {
+ registry.add(name, snapshot.getAngularObject().get(), false);
+ }
+ }
+ }
+ }
+ }
+
+ class SnapshotAngularObject {
+ String intpGroupId;
+ AngularObject angularObject;
+ Date lastUpdate;
+
+ public SnapshotAngularObject(String intpGroupId,
+ AngularObject angularObject, Date lastUpdate) {
+ super();
+ this.intpGroupId = intpGroupId;
+ this.angularObject = angularObject;
+ this.lastUpdate = lastUpdate;
+ }
+
+ public String getIntpGroupId() {
+ return intpGroupId;
+ }
+ public AngularObject getAngularObject() {
+ return angularObject;
+ }
+ public Date getLastUpdate() {
+ return lastUpdate;
+ }
}
public List<Note> getAllNotes() {
synchronized (notes) {
List<Note> noteList = new ArrayList<Note>(notes.values());
- logger.info("" + noteList.size());
Collections.sort(noteList, new Comparator() {
@Override
public int compare(Object one, Object two) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index e0986bf..79dfc3d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -19,16 +19,20 @@ package org.apache.zeppelin.notebook;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
import org.slf4j.Logger;
@@ -42,14 +46,16 @@ import org.slf4j.LoggerFactory;
public class Paragraph extends Job implements Serializable {
private static final transient long serialVersionUID = -6328572073497992016L;
private transient NoteInterpreterLoader replLoader;
+ private transient Note note;
String title;
String text;
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
public final GUI settings; // form and parameter settings
- public Paragraph(JobListener listener, NoteInterpreterLoader replLoader) {
+ public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) {
super(generateId(), listener);
+ this.note = note;
this.replLoader = replLoader;
title = null;
text = null;
@@ -79,6 +85,14 @@ public class Paragraph extends Job implements Serializable {
this.title = title;
}
+ public void setNote(Note note) {
+ this.note = note;
+ }
+
+ public Note getNote() {
+ return note;
+ }
+
public String getRequiredReplName() {
return getRequiredReplName(text);
}
@@ -207,14 +221,43 @@ public class Paragraph extends Job implements Serializable {
}
private InterpreterContext getInterpreterContext() {
+ AngularObjectRegistry registry = null;
+
+ if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) {
+ InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0);
+ registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry();
+ }
+
+ List<InterpreterContextRunner> runners = new LinkedList<InterpreterContextRunner>();
+ for (Paragraph p : note.getParagraphs()) {
+ runners.add(new ParagraphRunner(note, note.id(), p.getId()));
+ }
+
InterpreterContext interpreterContext = new InterpreterContext(getId(),
this.getTitle(),
this.getText(),
this.getConfig(),
- this.settings);
+ this.settings,
+ registry,
+ runners);
return interpreterContext;
}
+ static class ParagraphRunner extends InterpreterContextRunner {
+ private Note note;
+
+ public ParagraphRunner(Note note, String noteId, String paragraphId) {
+ super(noteId, paragraphId);
+ this.note = note;
+ }
+
+ @Override
+ public void run() {
+ note.run(getParagraphId());
+ }
+ }
+
+
private Logger logger() {
Logger logger = LoggerFactory.getLogger(Paragraph.class);
return logger;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 5199300..63aef0d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -54,8 +54,8 @@ public class InterpreterFactoryTest {
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
- factory = new InterpreterFactory(conf, new InterpreterOption(false));
- context = new InterpreterContext("id", "title", "text", null, null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
+ context = new InterpreterContext("id", "title", "text", null, null, null, null);
}
@@ -122,7 +122,7 @@ public class InterpreterFactoryTest {
factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
assertEquals(3, factory.get().size());
- InterpreterFactory factory2 = new InterpreterFactory(conf);
+ InterpreterFactory factory2 = new InterpreterFactory(conf, null);
assertEquals(3, factory2.get().size());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 88af541..8d2c65a 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -69,7 +69,7 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- factory = new InterpreterFactory(conf, new InterpreterOption(false));
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
notebook = new Notebook(conf, schedulerFactory, factory, this);
}
@@ -108,7 +108,7 @@ public class NotebookTest implements JobListenerFactory{
p1.setText("hello world");
note.persist();
- Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf), this);
+ Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf, null), this);
assertEquals(1, notebook2.getAllNotes().size());
}
[3/3] incubator-zeppelin git commit: ZEPPELIN-25 Ability to create
rich gui inside of Notebook
Posted by mo...@apache.org.
ZEPPELIN-25 Ability to create rich gui inside of Notebook
This PR implements https://issues.apache.org/jira/browse/ZEPPELIN-25
Here's a short video demo of this feature.
[![IMAGE ALT TEXT HERE](http://img.youtube.com/vi/xU5TBS_MsAs/0.jpg)](http://www.youtube.com/watch?v=xU5TBS_MsAs)
for someone who want to try, here's api
```scala
// bind 'varName' variable with 'v' value
z.angularBind(varName: String, v: Object)
// unbind 'varName'
z.angularUnbind(varName: String)
// get value of 'varName'
z.angular(varName:String)
// add watcher to 'varName' variable.
// that is monitoring change and run 'func' when it is changed.
z.angularWatch(varName:String, func: (Object, Object) => Unit))
// remove watcher from 'varName'
z.angularUnwatch(varName:String)
```
Any paragraph's output starting with '%angular' will be considered as angular code. %angular as a interpreter also available.
![image](https://cloud.githubusercontent.com/assets/1540981/7003457/a1e6fe2a-dc95-11e4-8272-380f11c6ae81.png)
Any feedback is welcome!
Author: Lee moon soo <mo...@apache.org>
Closes #27 from Leemoonsoo/angular and squashes the following commits:
04d7175 [Lee moon soo] Remove implicit conversion because of side effect
34fa298 [Lee moon soo] jquery to angular
8076098 [Lee moon soo] Remove unnecessary type information
88fd635 [Lee moon soo] catch and print watcher user provided routine exception
46dba2f [Lee moon soo] Catch sql syntax error
2ebfa59 [Lee moon soo] Let z.run optionally take InterpreterContext
ee29866 [Lee moon soo] ZEPPELIN-32 implement z.show()
dac416d [Lee moon soo] Implement z.run()
0899011 [Lee moon soo] Fix test
0033d32 [Lee moon soo] Add angular interpreter
42ee479 [Lee moon soo] com.nflabs -> org.apache
4d32d19 [Lee moon soo] ZEPPELIN-25 prevent watcher called multiple times
d4d270e [Lee moon soo] ZEPPELIN-25 add unittest
6ce8f36 [Lee moon soo] ZEPPELIN-25 implement watcher
6df7f23 [Lee moon soo] ZEPPELIN-25 broadcast angular object change to related notes
5954e29 [Lee moon soo] ZEPPELIN-25 save/restore angular object registry snapshot to the notebook file
c288198 [Lee moon soo] ZEPPELIN-25 send scope variables when loading note
67f6926 [Lee moon soo] ZEPPELIN-25 impelemnet JS(angular) -JVM(scala) two-way binding
bb52d7b [Lee moon soo] Add %angular display system
a7c77b8 [Lee moon soo] Update license of ScreenCaptureHtmlUnitDriver.java
6d7e063 [Lee moon soo] Add source file license header
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/58b70e3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/58b70e3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/58b70e3b
Branch: refs/heads/master
Commit: 58b70e3bc0b9fb814a5919037318a79ae67f678f
Parents: c0a7d08
Author: Lee moon soo <mo...@apache.org>
Authored: Sat Apr 11 11:15:05 2015 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Sun Apr 12 14:58:32 2015 +0900
----------------------------------------------------------------------
angular/pom.xml | 127 ++
.../zeppelin/angular/AngularInterpreter.java | 81 +
conf/zeppelin-site.xml.template | 2 +-
pom.xml | 1 +
.../apache/zeppelin/spark/SparkInterpreter.java | 6 +-
.../zeppelin/spark/SparkSqlInterpreter.java | 84 +-
.../apache/zeppelin/spark/ZeppelinContext.java | 348 ++++-
.../zeppelin/spark/DepInterpreterTest.java | 9 +-
.../zeppelin/spark/SparkInterpreterTest.java | 10 +-
.../zeppelin/spark/SparkSqlInterpreterTest.java | 14 +-
.../apache/zeppelin/display/AngularObject.java | 129 ++
.../zeppelin/display/AngularObjectListener.java | 25 +
.../zeppelin/display/AngularObjectRegistry.java | 106 ++
.../display/AngularObjectRegistryListener.java | 28 +
.../zeppelin/display/AngularObjectWatcher.java | 37 +
.../interpreter/InterpreterContext.java | 19 +-
.../interpreter/InterpreterContextRunner.java | 58 +
.../zeppelin/interpreter/InterpreterGroup.java | 21 +-
.../zeppelin/interpreter/InterpreterResult.java | 1 +
.../remote/InterpreterContextRunnerPool.java | 88 ++
.../interpreter/remote/RemoteAngularObject.java | 50 +
.../remote/RemoteAngularObjectRegistry.java | 67 +
.../interpreter/remote/RemoteInterpreter.java | 20 +-
.../remote/RemoteInterpreterContextRunner.java | 38 +
.../remote/RemoteInterpreterEventPoller.java | 126 ++
.../remote/RemoteInterpreterProcess.java | 50 +-
.../remote/RemoteInterpreterServer.java | 140 +-
.../thrift/RemoteInterpreterContext.java | 108 +-
.../thrift/RemoteInterpreterEvent.java | 502 ++++++
.../thrift/RemoteInterpreterEventType.java | 54 +
.../thrift/RemoteInterpreterService.java | 1462 ++++++++++++++++++
.../zeppelin/scheduler/ExecutorFactory.java | 83 +
.../zeppelin/scheduler/SchedulerFactory.java | 9 +-
.../main/thrift/RemoteInterpreterService.thrift | 19 +-
.../display/AngularObjectRegistryTest.java | 67 +
.../zeppelin/display/AngularObjectTest.java | 78 +
.../remote/RemoteAngularObjectTest.java | 144 ++
.../remote/RemoteInterpreterProcessTest.java | 16 +-
.../remote/RemoteInterpreterTest.java | 32 +-
.../remote/mock/MockInterpreterAngular.java | 117 ++
.../zeppelin/scheduler/RemoteSchedulerTest.java | 12 +-
.../apache/zeppelin/server/ZeppelinServer.java | 2 +-
.../org/apache/zeppelin/socket/Message.java | 5 +
.../apache/zeppelin/socket/NotebookServer.java | 151 +-
.../zeppelin/rest/ZeppelinRestApiTest.java | 2 +-
zeppelin-web/app/scripts/controllers/main.js | 4 +-
.../app/scripts/controllers/notebook.js | 49 +
.../app/scripts/controllers/paragraph.js | 26 +-
zeppelin-web/app/views/paragraph.html | 5 +
.../zeppelin/conf/ZeppelinConfiguration.java | 1 +
.../interpreter/InterpreterFactory.java | 63 +-
.../interpreter/InterpreterSetting.java | 10 +-
.../java/org/apache/zeppelin/notebook/Note.java | 37 +-
.../org/apache/zeppelin/notebook/Notebook.java | 77 +-
.../org/apache/zeppelin/notebook/Paragraph.java | 49 +-
.../interpreter/InterpreterFactoryTest.java | 6 +-
.../apache/zeppelin/notebook/NotebookTest.java | 4 +-
57 files changed, 4676 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/angular/pom.xml
----------------------------------------------------------------------
diff --git a/angular/pom.xml b/angular/pom.xml
new file mode 100644
index 0000000..580b848
--- /dev/null
+++ b/angular/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.5.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-angular</artifactId>
+ <packaging>jar</packaging>
+ <version>0.5.0-SNAPSHOT</version>
+ <name>Zeppelin: Angular interpreter</name>
+ <url>http://zeppelin.incubator.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>zeppelin-interpreter</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/angular</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-artifact</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/angular</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>${project.version}</version>
+ <type>${project.packaging}</type>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
----------------------------------------------------------------------
diff --git a/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
new file mode 100644
index 0000000..c7a406d
--- /dev/null
+++ b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.angular;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+/**
+ *
+ */
+public class AngularInterpreter extends Interpreter {
+ static {
+ Interpreter.register("angular", AngularInterpreter.class.getName());
+ }
+
+ public AngularInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ return new InterpreterResult(Code.SUCCESS, Type.ANGULAR, st);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return new LinkedList<String>();
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ AngularInterpreter.class.getName() + this.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 32011b1..13d794c 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -60,7 +60,7 @@
<property>
<name>zeppelin.interpreters</name>
- <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter</value>
+ <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 20a073e..d2fc77c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<module>zeppelin-zengine</module>
<module>spark</module>
<module>markdown</module>
+ <module>angular</module>
<module>shell</module>
<module>hive</module>
<module>zeppelin-web</module>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 2461109..b038dd6 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -107,6 +107,7 @@ public class SparkInterpreter extends Interpreter {
+ "we should set this value")
.add("zeppelin.spark.useHiveContext", "true",
"Use HiveContext instead of SQLContext if it is true.")
+ .add("zeppelin.spark.maxResult", "1000", "Max number of SparkSQL result to display.")
.add("args", "", "spark commandline args").build());
}
@@ -398,7 +399,8 @@ public class SparkInterpreter extends Interpreter {
dep = getDependencyResolver();
- z = new ZeppelinContext(sc, sqlc, null, dep, printStream);
+ z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
+ Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
try {
if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
@@ -510,7 +512,7 @@ public class SparkInterpreter extends Interpreter {
}
String getJobGroup(InterpreterContext context){
- return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
+ return "zeppelin-" + context.getParagraphId();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 2555988..618579d 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -29,18 +29,15 @@ import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SQLContext.QueryExecution;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@@ -76,7 +73,7 @@ public class SparkSqlInterpreter extends Interpreter {
}
private String getJobGroup(InterpreterContext context){
- return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
+ return "zeppelin-" + context.getParagraphId();
}
private int maxResult;
@@ -126,82 +123,13 @@ public class SparkSqlInterpreter extends Interpreter {
sc.setLocalProperty("spark.scheduler.pool", null);
}
- sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-
- // SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3
- Object rdd;
- Object[] rows = null;
try {
- rdd = sqlc.sql(st);
-
- Method take = rdd.getClass().getMethod("take", int.class);
- rows = (Object[]) take.invoke(rdd, maxResult + 1);
+ Object rdd = sqlc.sql(st);
+ String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult);
+ return new InterpreterResult(Code.SUCCESS, msg);
} catch (Exception e) {
- logger.error("Error", e);
- sc.clearJobGroup();
- return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
- }
-
- String msg = null;
-
- // get field names
- Method queryExecution;
- QueryExecution qe;
- try {
- queryExecution = rdd.getClass().getMethod("queryExecution");
- qe = (QueryExecution) queryExecution.invoke(rdd);
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- throw new InterpreterException(e);
- }
-
- List<Attribute> columns =
- scala.collection.JavaConverters.asJavaListConverter(
- qe.analyzed().output()).asJava();
-
- for (Attribute col : columns) {
- if (msg == null) {
- msg = col.name();
- } else {
- msg += "\t" + col.name();
- }
- }
-
- msg += "\n";
-
- // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
- // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
- // NullType, NumericType, ShortType, StringType, StructType
-
- try {
- for (int r = 0; r < maxResult && r < rows.length; r++) {
- Object row = rows[r];
- Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
- Method apply = row.getClass().getMethod("apply", int.class);
-
- for (int i = 0; i < columns.size(); i++) {
- if (!(Boolean) isNullAt.invoke(row, i)) {
- msg += apply.invoke(row, i).toString();
- } else {
- msg += "null";
- }
- if (i != columns.size() - 1) {
- msg += "\t";
- }
- }
- msg += "\n";
- }
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- throw new InterpreterException(e);
- }
-
- if (rows.length > maxResult) {
- msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
+ return new InterpreterResult(Code.ERROR, e.getMessage());
}
- InterpreterResult rett = new InterpreterResult(Code.SUCCESS, "%table " + msg);
- sc.clearJobGroup();
- return rett;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 87cd188..2c03f1c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -22,19 +22,31 @@ import static scala.collection.JavaConversions.asJavaIterable;
import static scala.collection.JavaConversions.collectionAsScalaIterable;
import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SQLContext.QueryExecution;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.hive.HiveContext;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.spark.dep.DependencyResolver;
import scala.Tuple2;
+import scala.Unit;
import scala.collection.Iterable;
/**
@@ -47,15 +59,18 @@ public class ZeppelinContext extends HashMap<String, Object> {
private DependencyResolver dep;
private PrintStream out;
private InterpreterContext interpreterContext;
+ private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
- DependencyResolver dep, PrintStream printStream) {
+ DependencyResolver dep, PrintStream printStream,
+ int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
this.out = printStream;
+ this.maxResult = maxResult;
}
public SparkContext sc;
@@ -63,12 +78,6 @@ public class ZeppelinContext extends HashMap<String, Object> {
public HiveContext hiveContext;
private GUI gui;
- /* spark-1.3
- public SchemaRDD sql(String sql) {
- return sqlContext.sql(sql);
- }
- */
-
/**
* Load dependency for interpreter and runtime (driver).
* And distribute them to spark cluster (sc.add())
@@ -221,25 +230,6 @@ public class ZeppelinContext extends HashMap<String, Object> {
this.gui = o;
}
- public void run(String lines) {
- /*
- String intpName = Paragraph.getRequiredReplName(lines);
- String scriptBody = Paragraph.getScriptBody(lines);
- Interpreter intp = interpreterContext.getParagraph().getRepl(intpName);
- InterpreterResult ret = intp.interpret(scriptBody, interpreterContext);
- if (ret.code() == InterpreterResult.Code.SUCCESS) {
- out.println("%" + ret.type().toString().toLowerCase() + " " + ret.message());
- } else if (ret.code() == InterpreterResult.Code.ERROR) {
- out.println("Error: " + ret.message());
- } else if (ret.code() == InterpreterResult.Code.INCOMPLETE) {
- out.println("Incomplete");
- } else {
- out.println("Unknown error");
- }
- */
- throw new RuntimeException("Missing implementation");
- }
-
private void restartInterpreter() {
}
@@ -251,4 +241,310 @@ public class ZeppelinContext extends HashMap<String, Object> {
this.interpreterContext = interpreterContext;
}
+ public void setMaxResult(int maxResult) {
+ this.maxResult = maxResult;
+ }
+
+ /**
+ * show DataFrame or SchemaRDD
+ * @param o DataFrame or SchemaRDD object
+ */
+ public void show(Object o) {
+ show(o, maxResult);
+ }
+
+ /**
+ * show DataFrame or SchemaRDD
+ * @param o DataFrame or SchemaRDD object
+ * @param maxResult maximum number of rows to display
+ */
+ public void show(Object o, int maxResult) {
+ Class cls = null;
+ try {
+ cls = this.getClass().forName("org.apache.spark.sql.DataFrame");
+ } catch (ClassNotFoundException e) {
+ }
+
+ if (cls == null) {
+ try {
+ cls = this.getClass().forName("org.apache.spark.sql.SchemaRDD");
+ } catch (ClassNotFoundException e) {
+ }
+ }
+
+ if (cls == null) {
+ throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
+ }
+
+ if (cls.isInstance(o)) {
+ out.print(showRDD(sc, interpreterContext, o, maxResult));
+ } else {
+ out.print(o.toString());
+ }
+ }
+
+ public static String showRDD(SparkContext sc,
+ InterpreterContext interpreterContext,
+ Object rdd, int maxResult) {
+ Object[] rows = null;
+ Method take;
+ String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
+ sc.setJobGroup(jobGroup, "Zeppelin", false);
+
+ try {
+ take = rdd.getClass().getMethod("take", int.class);
+ rows = (Object[]) take.invoke(rdd, maxResult + 1);
+
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ sc.clearJobGroup();
+ throw new InterpreterException(e);
+ }
+
+ String msg = null;
+
+ // get field names
+ Method queryExecution;
+ QueryExecution qe;
+ try {
+ queryExecution = rdd.getClass().getMethod("queryExecution");
+ qe = (QueryExecution) queryExecution.invoke(rdd);
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ throw new InterpreterException(e);
+ }
+
+ List<Attribute> columns =
+ scala.collection.JavaConverters.asJavaListConverter(
+ qe.analyzed().output()).asJava();
+
+ for (Attribute col : columns) {
+ if (msg == null) {
+ msg = col.name();
+ } else {
+ msg += "\t" + col.name();
+ }
+ }
+
+ msg += "\n";
+
+ // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
+ // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
+ // NullType, NumericType, ShortType, StringType, StructType
+
+ try {
+ for (int r = 0; r < maxResult && r < rows.length; r++) {
+ Object row = rows[r];
+ Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
+ Method apply = row.getClass().getMethod("apply", int.class);
+
+ for (int i = 0; i < columns.size(); i++) {
+ if (!(Boolean) isNullAt.invoke(row, i)) {
+ msg += apply.invoke(row, i).toString();
+ } else {
+ msg += "null";
+ }
+ if (i != columns.size() - 1) {
+ msg += "\t";
+ }
+ }
+ msg += "\n";
+ }
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ throw new InterpreterException(e);
+ }
+
+ if (rows.length > maxResult) {
+ msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
+ }
+ sc.clearJobGroup();
+ return "%table " + msg;
+ }
+
+ /**
+ * Run paragraph by id
+ * @param id
+ */
+ public void run(String id) {
+ run(id, interpreterContext);
+ }
+
+ /**
+ * Run paragraph by id
+ * @param id
+ * @param context
+ */
+ public void run(String id, InterpreterContext context) {
+ if (id.equals(context.getParagraphId())) {
+ throw new InterpreterException("Can not run current Paragraph");
+ }
+
+ for (InterpreterContextRunner r : context.getRunners()) {
+ if (id.equals(r.getParagraphId())) {
+ r.run();
+ return;
+ }
+ }
+
+ throw new InterpreterException("Paragraph " + id + " not found");
+ }
+
+ /**
+ * Run paragraph at idx
+ * @param idx
+ */
+ public void run(int idx) {
+ run(idx, interpreterContext);
+ }
+
+ /**
+ * Run paragraph at index
+ * @param idx index starting from 0
+ * @param context interpreter context
+ */
+ public void run(int idx, InterpreterContext context) {
+ if (idx >= context.getRunners().size()) {
+ throw new InterpreterException("Index out of bound");
+ }
+
+ InterpreterContextRunner runner = context.getRunners().get(idx);
+ if (runner.getParagraphId().equals(context.getParagraphId())) {
+ throw new InterpreterException("Can not run current Paragraph");
+ }
+
+ runner.run();
+ }
+
+ public void run(List<Object> paragraphIdOrIdx) {
+ run(paragraphIdOrIdx, interpreterContext);
+ }
+
+ /**
+ * Run paragraphs
+ * @param paragraphIdOrIdxs list of paragraph id or idx
+ */
+ public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
+ for (Object idOrIdx : paragraphIdOrIdx) {
+ if (idOrIdx instanceof String) {
+ String id = (String) idOrIdx;
+ run(id, context);
+ } else if (idOrIdx instanceof Integer) {
+ Integer idx = (Integer) idOrIdx;
+ run(idx, context);
+ } else {
+ throw new InterpreterException("Paragraph " + idOrIdx + " not found");
+ }
+ }
+ }
+
+ public void runAll() {
+ runAll(interpreterContext);
+ }
+
+ /**
+ * Run all paragraphs. except this.
+ */
+ public void runAll(InterpreterContext context) {
+ for (InterpreterContextRunner r : context.getRunners()) {
+ if (r.getParagraphId().equals(context.getParagraphId())) {
+ // skip itself
+ continue;
+ }
+ r.run();
+ }
+ }
+
+ public List<String> listParagraphs() {
+ List<String> paragraphs = new LinkedList<String>();
+
+ for (InterpreterContextRunner r : interpreterContext.getRunners()) {
+ paragraphs.add(r.getParagraphId());
+ }
+
+ return paragraphs;
+ }
+
+
+
+ public Object angular(String name) {
+ AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+ AngularObject ao = registry.get(name);
+ if (ao == null) {
+ return null;
+ } else {
+ return ao.get();
+ }
+ }
+
+ public void angularBind(String name, Object o) {
+ AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+ if (registry.get(name) == null) {
+ registry.add(name, o);
+ } else {
+ registry.get(name).set(o);
+ }
+ }
+
+ public void angularBind(String name, Object o, AngularObjectWatcher w) {
+ AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+ if (registry.get(name) == null) {
+ registry.add(name, o);
+ } else {
+ registry.get(name).set(o);
+ }
+ angularWatch(name, w);
+ }
+
+ public void angularWatch(String name, AngularObjectWatcher w) {
+ AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+ if (registry.get(name) != null) {
+ registry.get(name).addWatcher(w);
+ }
+ }
+
+
+ public void angularWatch(String name,
+ final scala.Function2<Object, Object, Unit> func) {
+ AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
+ @Override
+ public void watch(Object oldObject, Object newObject,
+ InterpreterContext context) {
+ func.apply(newObject, newObject);
+ }
+ };
+ angularWatch(name, w);
+ }
+
+ public void angularWatch(
+ String name,
+ final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
+ AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
+ @Override
+ public void watch(Object oldObject, Object newObject,
+ InterpreterContext context) {
+ func.apply(oldObject, newObject, context);
+ }
+ };
+ angularWatch(name, w);
+ }
+
+ public void angularUnwatch(String name, AngularObjectWatcher w) {
+ AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+ if (registry.get(name) != null) {
+ registry.get(name).removeWatcher(w);
+ }
+ }
+
+ public void angularUnwatch(String name) {
+ AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+ if (registry.get(name) != null) {
+ registry.get(name).clearAllWatchers();
+ }
+ }
+
+ public void angularUnbind(String name) {
+ AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+ registry.remove(name);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index 8d24cc4..2f8254d 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -21,15 +21,16 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Properties;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.spark.DepInterpreter;
-import org.apache.zeppelin.spark.SparkInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -57,7 +58,9 @@ public class DepInterpreterTest {
intpGroup.add(dep);
dep.setInterpreterGroup(intpGroup);
- context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
+ context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 20f7fa4..a5e0fe2 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -22,13 +22,16 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Properties;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.spark.SparkInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
@@ -55,7 +58,10 @@ public class SparkInterpreterTest {
repl.open();
}
- context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 71f088d..27198b3 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -20,15 +20,16 @@ package org.apache.zeppelin.spark;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Properties;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.apache.zeppelin.spark.SparkInterpreter;
-import org.apache.zeppelin.spark.SparkSqlInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -38,6 +39,7 @@ public class SparkSqlInterpreterTest {
private SparkSqlInterpreter sql;
private SparkInterpreter repl;
private InterpreterContext context;
+ private InterpreterGroup intpGroup;
@Before
public void setUp() throws Exception {
@@ -55,13 +57,15 @@ public class SparkSqlInterpreterTest {
sql = new SparkSqlInterpreter(p);
- InterpreterGroup intpGroup = new InterpreterGroup();
+ intpGroup = new InterpreterGroup();
intpGroup.add(repl);
intpGroup.add(sql);
sql.setInterpreterGroup(intpGroup);
sql.open();
}
- context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
+ context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
}
@After
@@ -79,7 +83,7 @@ public class SparkSqlInterpreterTest {
assertEquals(Type.TABLE, ret.type());
assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
- assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
+ assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from people", context).code());
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
new file mode 100644
index 0000000..bbfcd1b
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.zeppelin.scheduler.ExecutorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ * @param <T>
+ */
+public class AngularObject<T> {
+ private String name;
+ private T object;
+ private transient AngularObjectListener listener;
+ private transient List<AngularObjectWatcher> watchers
+ = new LinkedList<AngularObjectWatcher>();
+
+ protected AngularObject(String name, T o,
+ AngularObjectListener listener) {
+ this.name = name;
+ this.listener = listener;
+ object = o;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof AngularObject) {
+ return name.equals(((AngularObject) o).name);
+ } else {
+ return false;
+ }
+ }
+
+ public Object get() {
+ return object;
+ }
+
+ public void emit(){
+ if (listener != null) {
+ listener.updated(this);
+ }
+ }
+
+ public void set(T o) {
+ set(o, true);
+ }
+
+ public void set(T o, boolean emit) {
+ final T before = object;
+ final T after = o;
+ object = o;
+ if (emit) {
+ emit();
+ }
+
+ final Logger logger = LoggerFactory.getLogger(AngularObject.class);
+ List<AngularObjectWatcher> ws = new LinkedList<AngularObjectWatcher>();
+ synchronized (watchers) {
+ ws.addAll(watchers);
+ }
+
+ ExecutorService executor = ExecutorFactory.singleton().createOrGet("angularObjectWatcher", 50);
+ for (final AngularObjectWatcher w : ws) {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ w.watch(before, after);
+ } catch (Exception e) {
+ logger.error("Exception on watch", e);
+ }
+ }
+ });
+ }
+ }
+
+ public void setListener(AngularObjectListener listener) {
+ this.listener = listener;
+ }
+
+ public AngularObjectListener getListener() {
+ return listener;
+ }
+
+ public void addWatcher(AngularObjectWatcher watcher) {
+ synchronized (watchers) {
+ watchers.add(watcher);
+ }
+ }
+
+ public void removeWatcher(AngularObjectWatcher watcher) {
+ synchronized (watchers) {
+ watchers.remove(watcher);
+ }
+ }
+
+ public void clearAllWatchers() {
+ synchronized (watchers) {
+ watchers.clear();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java
new file mode 100644
index 0000000..880e487
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+/**
+ *
+ */
+public interface AngularObjectListener {
+ public void updated(AngularObject updatedObject);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
new file mode 100644
index 0000000..56eca22
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ *
+ */
+public class AngularObjectRegistry {
+ Map<String, AngularObject> registry = new HashMap<String, AngularObject>();
+ private AngularObjectRegistryListener listener;
+ private String interpreterId;
+
+ AngularObjectListener angularObjectListener;
+
+ public AngularObjectRegistry(final String interpreterId,
+ final AngularObjectRegistryListener listener) {
+ this.interpreterId = interpreterId;
+ this.listener = listener;
+ angularObjectListener = new AngularObjectListener() {
+ @Override
+ public void updated(AngularObject updatedObject) {
+ if (listener != null) {
+ listener.onUpdate(interpreterId, updatedObject);
+ }
+ }
+ };
+ }
+
+ public AngularObjectRegistryListener getListener() {
+ return listener;
+ }
+
+ public AngularObject add(String name, Object o) {
+ return add(name, o, true);
+ }
+
+ public AngularObject add(String name, Object o, boolean emit) {
+ AngularObject ao = createNewAngularObject(name, o);
+
+ synchronized (registry) {
+ registry.put(name, ao);
+ if (listener != null && emit) {
+ listener.onAdd(interpreterId, ao);
+ }
+ }
+
+ return ao;
+ }
+
+ protected AngularObject createNewAngularObject(String name, Object o) {
+ return new AngularObject(name, o, angularObjectListener);
+ }
+
+ protected AngularObjectListener getAngularObjectListener() {
+ return angularObjectListener;
+ }
+
+ public AngularObject remove(String name) {
+ synchronized (registry) {
+ AngularObject o = registry.remove(name);
+ if (listener != null) {
+ listener.onRemove(interpreterId, o);;
+ }
+ return o;
+ }
+ }
+
+ public AngularObject get(String name) {
+ synchronized (registry) {
+ return registry.get(name);
+ }
+ }
+
+ public List<AngularObject> getAll() {
+ List<AngularObject> all = new LinkedList<AngularObject>();
+ synchronized (registry) {
+ all.addAll(registry.values());
+ }
+ return all;
+ }
+
+ public String getInterpreterGroupId() {
+ return interpreterId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java
new file mode 100644
index 0000000..3f08efa
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+/**
+ *
+ *
+ */
+public interface AngularObjectRegistryListener {
+ public void onAdd(String interpreterGroupId, AngularObject object);
+ public void onUpdate(String interpreterGroupId, AngularObject object);
+ public void onRemove(String interpreterGroupId, AngularObject object);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java
new file mode 100644
index 0000000..c5bd5e2
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+
+/**
+ *
+ */
+public abstract class AngularObjectWatcher {
+ private InterpreterContext context;
+
+ public AngularObjectWatcher(InterpreterContext context) {
+ this.context = context;
+ }
+
+ void watch(Object oldObject, Object newObject) {
+ watch(oldObject, newObject, context);
+ }
+
+ public abstract void watch(Object oldObject, Object newObject, InterpreterContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 2d70c8e..2e4564e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -17,8 +17,10 @@
package org.apache.zeppelin.interpreter;
+import java.util.List;
import java.util.Map;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
/**
@@ -30,19 +32,24 @@ public class InterpreterContext {
private final String paragraphText;
private final Map<String, Object> config;
private GUI gui;
-
+ private AngularObjectRegistry angularObjectRegistry;
+ private List<InterpreterContextRunner> runners;
public InterpreterContext(String paragraphId,
String paragraphTitle,
String paragraphText,
Map<String, Object> config,
- GUI gui
+ GUI gui,
+ AngularObjectRegistry angularObjectRegistry,
+ List<InterpreterContextRunner> runners
) {
this.paragraphId = paragraphId;
this.paragraphTitle = paragraphTitle;
this.paragraphText = paragraphText;
this.config = config;
this.gui = gui;
+ this.angularObjectRegistry = angularObjectRegistry;
+ this.runners = runners;
}
public String getParagraphId() {
@@ -65,4 +72,12 @@ public class InterpreterContext {
return gui;
}
+ public AngularObjectRegistry getAngularObjectRegistry() {
+ return angularObjectRegistry;
+ }
+
+ public List<InterpreterContextRunner> getRunners() {
+ return runners;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java
new file mode 100644
index 0000000..7a2df10
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ */
+public abstract class InterpreterContextRunner implements Runnable {
+ String noteId;
+ private String paragraphId;
+
+ public InterpreterContextRunner(String noteId, String paragraphId) {
+ this.noteId = noteId;
+ this.paragraphId = paragraphId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof InterpreterContextRunner) {
+ InterpreterContextRunner io = ((InterpreterContextRunner) o);
+ if (io.getParagraphId().equals(paragraphId) &&
+ io.getNoteId().equals(noteId)) {
+ return true;
+ } else {
+ return false;
+ }
+
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public abstract void run();
+
+ public String getNoteId() {
+ return noteId;
+ }
+
+ public String getParagraphId() {
+ return paragraphId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 834630a..9baaef3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -21,6 +21,8 @@ import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+
/**
* InterpreterGroup is list of interpreters in the same group.
* And unit of interpreter instantiate, restart, bind, unbind.
@@ -28,6 +30,16 @@ import java.util.Random;
public class InterpreterGroup extends LinkedList<Interpreter>{
String id;
+ AngularObjectRegistry angularObjectRegistry;
+
+ public InterpreterGroup(String id) {
+ this.id = id;
+ }
+
+ public InterpreterGroup() {
+ getId();
+ }
+
private static String generateId() {
return "InterpreterGroup_" + System.currentTimeMillis() + "_"
+ new Random().nextInt();
@@ -42,7 +54,6 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
}
}
-
public Properties getProperty() {
Properties p = new Properties();
for (Interpreter intp : this) {
@@ -51,6 +62,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
return p;
}
+ public AngularObjectRegistry getAngularObjectRegistry() {
+ return angularObjectRegistry;
+ }
+
+ public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
+ this.angularObjectRegistry = angularObjectRegistry;
+ }
+
public void close() {
for (Interpreter intp : this) {
intp.close();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 0659a47..5d8d96f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -48,6 +48,7 @@ public class InterpreterResult implements Serializable {
public static enum Type {
TEXT,
HTML,
+ ANGULAR,
TABLE,
IMG,
SVG,
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
new file mode 100644
index 0000000..ca2df12
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class InterpreterContextRunnerPool {
+ Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class);
+ private Map<String, List<InterpreterContextRunner>> interpreterContextRunners;
+
+ public InterpreterContextRunnerPool() {
+ interpreterContextRunners = new HashMap<String, List<InterpreterContextRunner>>();
+
+ }
+
+ // add runner
+ public void add(String noteId, InterpreterContextRunner runner) {
+ synchronized (interpreterContextRunners) {
+ if (!interpreterContextRunners.containsKey(noteId)) {
+ interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
+ }
+
+ interpreterContextRunners.get(noteId).add(runner);
+ }
+ }
+
+ // replace all runners to noteId
+ public void addAll(String noteId, List<InterpreterContextRunner> runners) {
+ synchronized (interpreterContextRunners) {
+ if (!interpreterContextRunners.containsKey(noteId)) {
+ interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
+ }
+
+ interpreterContextRunners.get(noteId).addAll(runners);
+ }
+ }
+
+ public void clear(String noteId) {
+ synchronized (interpreterContextRunners) {
+ interpreterContextRunners.remove(noteId);
+ }
+ }
+
+
+ public void run(String noteId, String paragraphId) {
+ synchronized (interpreterContextRunners) {
+ List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId);
+ if (list != null) {
+ for (InterpreterContextRunner r : list) {
+ if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) {
+ logger.info("run paragraph {} on note {} from InterpreterContext",
+ r.getParagraphId(), r.getNoteId());
+ r.run();
+ return;
+ }
+ }
+ }
+
+ throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
new file mode 100644
index 0000000..3abd764
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectListener;
+
+/**
+ *
+ */
+public class RemoteAngularObject extends AngularObject {
+
+ private transient RemoteInterpreterProcess remoteInterpreterProcess;
+
+ RemoteAngularObject(String name, Object o, String interpreterGroupId,
+ AngularObjectListener listener,
+ RemoteInterpreterProcess remoteInterpreterProcess) {
+ super(name, o, listener);
+ this.remoteInterpreterProcess = remoteInterpreterProcess;
+ }
+
+ @Override
+ public void set(Object o, boolean emit) {
+ set(o, emit, true);
+ }
+
+ public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) {
+ super.set(o, emitWeb);
+
+ if (emitRemoteProcess) {
+ // send updated value to remote interpreter
+ remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
new file mode 100644
index 0000000..c711f69
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+
+/**
+ *
+ */
+public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
+
+ private InterpreterGroup interpreterGroup;
+
+ public RemoteAngularObjectRegistry(String interpreterId,
+ AngularObjectRegistryListener listener,
+ InterpreterGroup interpreterGroup) {
+ super(interpreterId, listener);
+ this.interpreterGroup = interpreterGroup;
+ }
+
+ private RemoteInterpreterProcess getRemoteInterpreterProcess() {
+ if (interpreterGroup.size() == 0) {
+ throw new RuntimeException("Can't get remoteInterpreterProcess");
+ }
+ Interpreter p = interpreterGroup.get(0);
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+
+ if (p instanceof RemoteInterpreter) {
+ return ((RemoteInterpreter) p).getInterpreterProcess();
+ } else {
+ throw new RuntimeException("Can't get remoteInterpreterProcess");
+ }
+ }
+
+ @Override
+ protected AngularObject createNewAngularObject(String name, Object o) {
+ RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ throw new RuntimeException("Remote Interpreter process not found");
+ }
+ return new RemoteAngularObject(name, o, getInterpreterGroupId(),
+ getAngularObjectListener(),
+ getRemoteInterpreterProcess());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index e905d5f..3e6128f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -26,6 +26,7 @@ import org.apache.thrift.TException;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -56,6 +57,8 @@ public class RemoteInterpreter extends Interpreter {
static Map<String, RemoteInterpreterProcess> interpreterGroupReference
= new HashMap<String, RemoteInterpreterProcess>();
+ private InterpreterContextRunnerPool interpreterContextRunnerPool;
+
public RemoteInterpreter(Properties property,
String className,
String interpreterRunner,
@@ -67,6 +70,7 @@ public class RemoteInterpreter extends Interpreter {
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
env = new HashMap<String, String>();
+ interpreterContextRunnerPool = new InterpreterContextRunnerPool();
}
public RemoteInterpreter(Properties property,
@@ -119,7 +123,7 @@ public class RemoteInterpreter extends Interpreter {
}
}
- int rc = interpreterProcess.reference();
+ int rc = interpreterProcess.reference(getInterpreterGroup());
synchronized (interpreterProcess) {
// when first process created
@@ -187,6 +191,15 @@ public class RemoteInterpreter extends Interpreter {
throw new InterpreterException(e1);
}
+ List<InterpreterContextRunner> runners = context.getRunners();
+ if (runners != null && runners.size() != 0) {
+ // assume all runners in this InterpreterContext have the same note id
+ String noteId = runners.get(0).getNoteId();
+
+ interpreterContextRunnerPool.clear(noteId);
+ interpreterContextRunnerPool.addAll(noteId, runners);
+ }
+
try {
GUI settings = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
@@ -316,7 +329,7 @@ public class RemoteInterpreter extends Interpreter {
.containsKey(getInterpreterGroupKey(interpreterGroup))) {
interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup),
new RemoteInterpreterProcess(interpreterRunner,
- interpreterPath, env));
+ interpreterPath, env, interpreterContextRunnerPool));
logger.info("setInterpreterGroup = "
+ getInterpreterGroupKey(interpreterGroup) + " class=" + className
@@ -335,7 +348,8 @@ public class RemoteInterpreter extends Interpreter {
ic.getParagraphTitle(),
ic.getParagraphText(),
gson.toJson(ic.getConfig()),
- gson.toJson(ic.getGui()));
+ gson.toJson(ic.getGui()),
+ gson.toJson(ic.getRunners()));
}
private InterpreterResult convert(RemoteInterpreterResult result) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
new file mode 100644
index 0000000..8d16ec5
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+
+/**
+ *
+ */
+public class RemoteInterpreterContextRunner extends InterpreterContextRunner {
+
+ public RemoteInterpreterContextRunner(String noteId, String paragraphId) {
+ super(noteId, paragraphId);
+ }
+
+ @Override
+ public void run() {
+ // this class should be used only for gson deserialize abstract class
+ // code should not reach here
+ throw new InterpreterException("Assert");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
new file mode 100644
index 0000000..dc9ef0b
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ *
+ */
+public class RemoteInterpreterEventPoller extends Thread {
+ Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+ private RemoteInterpreterProcess interpreterProcess;
+ boolean shutdown;
+ private InterpreterGroup interpreterGroup;
+
+ public RemoteInterpreterEventPoller(
+ InterpreterGroup interpreterGroup,
+ RemoteInterpreterProcess interpreterProcess) {
+ this.interpreterGroup = interpreterGroup;
+ this.interpreterProcess = interpreterProcess;
+ shutdown = false;
+ }
+
+ @Override
+ public void run() {
+ Client client = null;
+
+ while (shutdown == false) {
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ logger.error("Can't get RemoteInterpreterEvent", e1);
+ try {
+ synchronized (this) {
+ wait(1000);
+ }
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ RemoteInterpreterEvent event = null;
+ try {
+ event = client.getEvent();
+ } catch (TException e) {
+ logger.error("Can't get RemoteInterpreterEvent", e);
+ try {
+ synchronized (this) {
+ wait(1000);
+ }
+ } catch (InterruptedException e1) {
+ }
+ continue;
+ }
+
+ interpreterProcess.releaseClient(client);
+
+ Gson gson = new Gson();
+
+ AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
+
+ try {
+ if (event.getType() == RemoteInterpreterEventType.NO_OP) {
+ continue;
+ } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
+ AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
+ angularObjectRegistry.add(angularObject.getName(), angularObject.get());
+ } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
+ AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
+ AngularObject localAngularObject = angularObjectRegistry.get(angularObject.getName());
+ if (localAngularObject instanceof RemoteAngularObject) {
+ // to avoid ping-pong loop
+ ((RemoteAngularObject) localAngularObject).set(
+ angularObject.get(), true, false);
+ } else {
+ localAngularObject.set(angularObject.get());
+ }
+ } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
+ AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
+ angularObjectRegistry.remove(angularObject.getName());
+ } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
+ InterpreterContextRunner runnerFromRemote = gson.fromJson(
+ event.getData(), RemoteInterpreterContextRunner.class);
+
+ interpreterProcess.getInterpreterContextRunnerPool().run(
+ runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+ }
+ } catch (Exception e) {
+ logger.error("Can't handle event " + event, e);
+ }
+ }
+ }
+
+ public void shutdown() {
+ shutdown = true;
+ synchronized (this) {
+ notify();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index a128cd7..dbfaa35 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -28,11 +28,15 @@ import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+
/**
*
*/
@@ -48,11 +52,17 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private GenericObjectPool<Client> clientPool;
private Map<String, String> env;
+ private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+ private InterpreterContextRunnerPool interpreterContextRunnerPool;
- public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env) {
+ public RemoteInterpreterProcess(String intpRunner,
+ String intpDir,
+ Map<String, String> env,
+ InterpreterContextRunnerPool interpreterContextRunnerPool) {
this.interpreterRunner = intpRunner;
this.interpreterDir = intpDir;
this.env = env;
+ this.interpreterContextRunnerPool = interpreterContextRunnerPool;
referenceCount = new AtomicInteger(0);
}
@@ -60,7 +70,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
return port;
}
- public int reference() {
+ public int reference(InterpreterGroup interpreterGroup) {
synchronized (referenceCount) {
if (executor == null) {
// start server process
@@ -108,6 +118,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
}
clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port));
+
+ remoteInterpreterEventPoller = new RemoteInterpreterEventPoller(interpreterGroup, this);
+ remoteInterpreterEventPoller.start();
}
return referenceCount.incrementAndGet();
}
@@ -126,6 +139,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
int r = referenceCount.decrementAndGet();
if (r == 0) {
logger.info("shutdown interpreter process");
+ remoteInterpreterEventPoller.shutdown();
+
// first try shutdown
try {
Client client = getClient();
@@ -205,4 +220,35 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
return clientPool.getNumIdle();
}
}
+
+ /**
+ * Called when angular object is updated in client side to propagate
+ * change to the remote process
+ * @param name
+ * @param o
+ */
+ public void updateRemoteAngularObject(String name, Object o) {
+ Client client = null;
+ try {
+ client = getClient();
+ } catch (NullPointerException e) {
+ // remote process not started
+ return;
+ } catch (Exception e) {
+ logger.error("Can't update angular object", e);
+ }
+
+ try {
+ Gson gson = new Gson();
+ client.angularObjectUpdate(name, gson.toJson(o));
+ } catch (TException e) {
+ logger.error("Can't update angular object", e);
+ } finally {
+ releaseClient(client);
+ }
+ }
+
+ public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
+ return interpreterContextRunnerPool;
+ }
}
[2/3] incubator-zeppelin git commit: ZEPPELIN-25 Ability to create
rich gui inside of Notebook
Posted by mo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index ee4aa2d..8b4b236 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -21,6 +21,7 @@ package org.apache.zeppelin.interpreter.remote;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -29,39 +30,45 @@ import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.JobProgressPoller;
import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-
/**
*
*/
public class RemoteInterpreterServer
extends Thread
- implements RemoteInterpreterService.Iface {
+ implements RemoteInterpreterService.Iface, AngularObjectRegistryListener {
Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class);
- InterpreterGroup interpreterGroup = new InterpreterGroup();
+ InterpreterGroup interpreterGroup;
+ AngularObjectRegistry angularObjectRegistry;
Gson gson = new Gson();
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
@@ -69,8 +76,14 @@ public class RemoteInterpreterServer
private int port;
private TThreadPoolServer server;
+ List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
+
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
+ interpreterGroup = new InterpreterGroup();
+ angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
+ interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+
processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this);
TServerSocket serverTransport = new TServerSocket(port);
server = new TThreadPoolServer(
@@ -300,13 +313,42 @@ public class RemoteInterpreterServer
}
private InterpreterContext convert(RemoteInterpreterContext ric) {
+ List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
+ List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
+ new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
+
+ for (InterpreterContextRunner r : runners) {
+ contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
+ }
+
return new InterpreterContext(
ric.getParagraphId(),
ric.getParagraphTitle(),
ric.getParagraphText(),
(Map<String, Object>) gson.fromJson(ric.getConfig(),
new TypeToken<Map<String, Object>>() {}.getType()),
- gson.fromJson(ric.getGui(), GUI.class));
+ gson.fromJson(ric.getGui(), GUI.class),
+ interpreterGroup.getAngularObjectRegistry(),
+ contextRunners);
+ }
+
+
+ static class ParagraphRunner extends InterpreterContextRunner {
+
+ private transient RemoteInterpreterServer server;
+
+ public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
+ super(noteId, paragraphId);
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
+ Gson gson = new Gson();
+ server.sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
+ gson.toJson(this)));
+ }
}
private RemoteInterpreterResult convert(InterpreterResult result,
@@ -339,4 +381,90 @@ public class RemoteInterpreterServer
}
return "Unknown";
}
+
+
+
+ @Override
+ public void onAdd(String interpreterGroupId, AngularObject object) {
+ sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, gson.toJson(object)));
+ }
+
+ @Override
+ public void onUpdate(String interpreterGroupId, AngularObject object) {
+ sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, gson.toJson(object)));
+ }
+
+ @Override
+ public void onRemove(String interpreterGroupId, AngularObject object) {
+ sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(object)));
+ }
+
+ private void sendEvent(RemoteInterpreterEvent event) {
+ synchronized (eventQueue) {
+ eventQueue.add(event);
+ eventQueue.notifyAll();
+ }
+ }
+
+ @Override
+ public RemoteInterpreterEvent getEvent() throws TException {
+ synchronized (eventQueue) {
+ if (eventQueue.isEmpty()) {
+ try {
+ eventQueue.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (eventQueue.isEmpty()) {
+ return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
+ } else {
+ return eventQueue.remove(0);
+ }
+ }
+ }
+
+ /**
+ * called when object is updated in client (web) side.
+ * @param className
+ * @param name
+ * @param object
+ * @throws TException
+ */
+ @Override
+ public void angularObjectUpdate(String name, String object)
+ throws TException {
+ AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
+ AngularObject ao = registry.get(name);
+ if (ao == null) {
+ logger.error("Angular object {} not exists", name);
+ return;
+ }
+
+ if (object == null) {
+ ao.set(null, false);
+ return;
+ }
+
+ Object oldObject = ao.get();
+ if (oldObject != null) { // first try with previous object's type
+ Object value;
+ try {
+ value = gson.fromJson(object, oldObject.getClass());
+ ao.set(value, false);
+ return;
+ } catch (Exception e) {
+ // no luck
+ }
+ }
+
+ // Generic java object type for json.
+ Map<String, Object> value = gson.fromJson(object,
+ new TypeToken<Map<String, Object>>() {
+ }.getType());
+ ao.set(value, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index 4284cf1..9827a45 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -38,6 +38,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
private static final org.apache.thrift.protocol.TField PARAGRAPH_TEXT_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphText", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField GUI_FIELD_DESC = new org.apache.thrift.protocol.TField("gui", org.apache.thrift.protocol.TType.STRING, (short)5);
+ private static final org.apache.thrift.protocol.TField RUNNERS_FIELD_DESC = new org.apache.thrift.protocol.TField("runners", org.apache.thrift.protocol.TType.STRING, (short)6);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -50,6 +51,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
public String paragraphText; // required
public String config; // required
public String gui; // required
+ public String runners; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -57,7 +59,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
PARAGRAPH_TITLE((short)2, "paragraphTitle"),
PARAGRAPH_TEXT((short)3, "paragraphText"),
CONFIG((short)4, "config"),
- GUI((short)5, "gui");
+ GUI((short)5, "gui"),
+ RUNNERS((short)6, "runners");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -82,6 +85,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return CONFIG;
case 5: // GUI
return GUI;
+ case 6: // RUNNERS
+ return RUNNERS;
default:
return null;
}
@@ -135,6 +140,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.GUI, new org.apache.thrift.meta_data.FieldMetaData("gui", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.RUNNERS, new org.apache.thrift.meta_data.FieldMetaData("runners", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteInterpreterContext.class, metaDataMap);
}
@@ -147,7 +154,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
String paragraphTitle,
String paragraphText,
String config,
- String gui)
+ String gui,
+ String runners)
{
this();
this.paragraphId = paragraphId;
@@ -155,6 +163,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
this.paragraphText = paragraphText;
this.config = config;
this.gui = gui;
+ this.runners = runners;
}
/**
@@ -176,6 +185,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (other.isSetGui()) {
this.gui = other.gui;
}
+ if (other.isSetRunners()) {
+ this.runners = other.runners;
+ }
}
public RemoteInterpreterContext deepCopy() {
@@ -189,6 +201,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
this.paragraphText = null;
this.config = null;
this.gui = null;
+ this.runners = null;
}
public String getParagraphId() {
@@ -311,6 +324,30 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
}
}
+ public String getRunners() {
+ return this.runners;
+ }
+
+ public RemoteInterpreterContext setRunners(String runners) {
+ this.runners = runners;
+ return this;
+ }
+
+ public void unsetRunners() {
+ this.runners = null;
+ }
+
+ /** Returns true if field runners is set (has been assigned a value) and false otherwise */
+ public boolean isSetRunners() {
+ return this.runners != null;
+ }
+
+ public void setRunnersIsSet(boolean value) {
+ if (!value) {
+ this.runners = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case PARAGRAPH_ID:
@@ -353,6 +390,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
}
break;
+ case RUNNERS:
+ if (value == null) {
+ unsetRunners();
+ } else {
+ setRunners((String)value);
+ }
+ break;
+
}
}
@@ -373,6 +418,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
case GUI:
return getGui();
+ case RUNNERS:
+ return getRunners();
+
}
throw new IllegalStateException();
}
@@ -394,6 +442,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return isSetConfig();
case GUI:
return isSetGui();
+ case RUNNERS:
+ return isSetRunners();
}
throw new IllegalStateException();
}
@@ -456,6 +506,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return false;
}
+ boolean this_present_runners = true && this.isSetRunners();
+ boolean that_present_runners = true && that.isSetRunners();
+ if (this_present_runners || that_present_runners) {
+ if (!(this_present_runners && that_present_runners))
+ return false;
+ if (!this.runners.equals(that.runners))
+ return false;
+ }
+
return true;
}
@@ -522,6 +581,16 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetRunners()).compareTo(typedOther.isSetRunners());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRunners()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.runners, typedOther.runners);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -581,6 +650,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
sb.append(this.gui);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("runners:");
+ if (this.runners == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.runners);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -664,6 +741,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 6: // RUNNERS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.runners = iprot.readString();
+ struct.setRunnersIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -704,6 +789,11 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
oprot.writeString(struct.gui);
oprot.writeFieldEnd();
}
+ if (struct.runners != null) {
+ oprot.writeFieldBegin(RUNNERS_FIELD_DESC);
+ oprot.writeString(struct.runners);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -737,7 +827,10 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (struct.isSetGui()) {
optionals.set(4);
}
- oprot.writeBitSet(optionals, 5);
+ if (struct.isSetRunners()) {
+ optionals.set(5);
+ }
+ oprot.writeBitSet(optionals, 6);
if (struct.isSetParagraphId()) {
oprot.writeString(struct.paragraphId);
}
@@ -753,12 +846,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (struct.isSetGui()) {
oprot.writeString(struct.gui);
}
+ if (struct.isSetRunners()) {
+ oprot.writeString(struct.runners);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterContext struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(5);
+ BitSet incoming = iprot.readBitSet(6);
if (incoming.get(0)) {
struct.paragraphId = iprot.readString();
struct.setParagraphIdIsSet(true);
@@ -779,6 +875,10 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
struct.gui = iprot.readString();
struct.setGuiIsSet(true);
}
+ if (incoming.get(5)) {
+ struct.runners = iprot.readString();
+ struct.setRunnersIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
new file mode 100644
index 0000000..b44ea96
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -0,0 +1,502 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
+
+ private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)1);
+ private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new RemoteInterpreterEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new RemoteInterpreterEventTupleSchemeFactory());
+ }
+
+ /**
+ *
+ * @see RemoteInterpreterEventType
+ */
+ public RemoteInterpreterEventType type; // required
+ public String data; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ /**
+ *
+ * @see RemoteInterpreterEventType
+ */
+ TYPE((short)1, "type"),
+ DATA((short)2, "data");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TYPE
+ return TYPE;
+ case 2: // DATA
+ return DATA;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, RemoteInterpreterEventType.class)));
+ tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteInterpreterEvent.class, metaDataMap);
+ }
+
+ public RemoteInterpreterEvent() {
+ }
+
+ public RemoteInterpreterEvent(
+ RemoteInterpreterEventType type,
+ String data)
+ {
+ this();
+ this.type = type;
+ this.data = data;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public RemoteInterpreterEvent(RemoteInterpreterEvent other) {
+ if (other.isSetType()) {
+ this.type = other.type;
+ }
+ if (other.isSetData()) {
+ this.data = other.data;
+ }
+ }
+
+ public RemoteInterpreterEvent deepCopy() {
+ return new RemoteInterpreterEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.type = null;
+ this.data = null;
+ }
+
+ /**
+ *
+ * @see RemoteInterpreterEventType
+ */
+ public RemoteInterpreterEventType getType() {
+ return this.type;
+ }
+
+ /**
+ *
+ * @see RemoteInterpreterEventType
+ */
+ public RemoteInterpreterEvent setType(RemoteInterpreterEventType type) {
+ this.type = type;
+ return this;
+ }
+
+ public void unsetType() {
+ this.type = null;
+ }
+
+ /** Returns true if field type is set (has been assigned a value) and false otherwise */
+ public boolean isSetType() {
+ return this.type != null;
+ }
+
+ public void setTypeIsSet(boolean value) {
+ if (!value) {
+ this.type = null;
+ }
+ }
+
+ public String getData() {
+ return this.data;
+ }
+
+ public RemoteInterpreterEvent setData(String data) {
+ this.data = data;
+ return this;
+ }
+
+ public void unsetData() {
+ this.data = null;
+ }
+
+ /** Returns true if field data is set (has been assigned a value) and false otherwise */
+ public boolean isSetData() {
+ return this.data != null;
+ }
+
+ public void setDataIsSet(boolean value) {
+ if (!value) {
+ this.data = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TYPE:
+ if (value == null) {
+ unsetType();
+ } else {
+ setType((RemoteInterpreterEventType)value);
+ }
+ break;
+
+ case DATA:
+ if (value == null) {
+ unsetData();
+ } else {
+ setData((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TYPE:
+ return getType();
+
+ case DATA:
+ return getData();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TYPE:
+ return isSetType();
+ case DATA:
+ return isSetData();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof RemoteInterpreterEvent)
+ return this.equals((RemoteInterpreterEvent)that);
+ return false;
+ }
+
+ public boolean equals(RemoteInterpreterEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_type = true && this.isSetType();
+ boolean that_present_type = true && that.isSetType();
+ if (this_present_type || that_present_type) {
+ if (!(this_present_type && that_present_type))
+ return false;
+ if (!this.type.equals(that.type))
+ return false;
+ }
+
+ boolean this_present_data = true && this.isSetData();
+ boolean that_present_data = true && that.isSetData();
+ if (this_present_data || that_present_data) {
+ if (!(this_present_data && that_present_data))
+ return false;
+ if (!this.data.equals(that.data))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(RemoteInterpreterEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ RemoteInterpreterEvent typedOther = (RemoteInterpreterEvent)other;
+
+ lastComparison = Boolean.valueOf(isSetType()).compareTo(typedOther.isSetType());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetType()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, typedOther.type);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetData()).compareTo(typedOther.isSetData());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetData()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.data, typedOther.data);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("RemoteInterpreterEvent(");
+ boolean first = true;
+
+ sb.append("type:");
+ if (this.type == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.type);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("data:");
+ if (this.data == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.data);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class RemoteInterpreterEventStandardSchemeFactory implements SchemeFactory {
+ public RemoteInterpreterEventStandardScheme getScheme() {
+ return new RemoteInterpreterEventStandardScheme();
+ }
+ }
+
+ private static class RemoteInterpreterEventStandardScheme extends StandardScheme<RemoteInterpreterEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TYPE
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.type = RemoteInterpreterEventType.findByValue(iprot.readI32());
+ struct.setTypeIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // DATA
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.data = iprot.readString();
+ struct.setDataIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.type != null) {
+ oprot.writeFieldBegin(TYPE_FIELD_DESC);
+ oprot.writeI32(struct.type.getValue());
+ oprot.writeFieldEnd();
+ }
+ if (struct.data != null) {
+ oprot.writeFieldBegin(DATA_FIELD_DESC);
+ oprot.writeString(struct.data);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class RemoteInterpreterEventTupleSchemeFactory implements SchemeFactory {
+ public RemoteInterpreterEventTupleScheme getScheme() {
+ return new RemoteInterpreterEventTupleScheme();
+ }
+ }
+
+ private static class RemoteInterpreterEventTupleScheme extends TupleScheme<RemoteInterpreterEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetType()) {
+ optionals.set(0);
+ }
+ if (struct.isSetData()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetType()) {
+ oprot.writeI32(struct.type.getValue());
+ }
+ if (struct.isSetData()) {
+ oprot.writeString(struct.data);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.type = RemoteInterpreterEventType.findByValue(iprot.readI32());
+ struct.setTypeIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.data = iprot.readString();
+ struct.setDataIsSet(true);
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
new file mode 100644
index 0000000..abbc68f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -0,0 +1,54 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
+ NO_OP(1),
+ ANGULAR_OBJECT_ADD(2),
+ ANGULAR_OBJECT_UPDATE(3),
+ ANGULAR_OBJECT_REMOVE(4),
+ RUN_INTERPRETER_CONTEXT_RUNNER(5);
+
+ private final int value;
+
+ private RemoteInterpreterEventType(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static RemoteInterpreterEventType findByValue(int value) {
+ switch (value) {
+ case 1:
+ return NO_OP;
+ case 2:
+ return ANGULAR_OBJECT_ADD;
+ case 3:
+ return ANGULAR_OBJECT_UPDATE;
+ case 4:
+ return ANGULAR_OBJECT_REMOVE;
+ case 5:
+ return RUN_INTERPRETER_CONTEXT_RUNNER;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index a64395f..398cebf 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -54,6 +54,10 @@ public class RemoteInterpreterService {
public String getStatus(String jobId) throws org.apache.thrift.TException;
+ public RemoteInterpreterEvent getEvent() throws org.apache.thrift.TException;
+
+ public void angularObjectUpdate(String name, String object) throws org.apache.thrift.TException;
+
}
public interface AsyncIface {
@@ -78,6 +82,10 @@ public class RemoteInterpreterService {
public void getStatus(String jobId, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getStatus_call> resultHandler) throws org.apache.thrift.TException;
+ public void getEvent(org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getEvent_call> resultHandler) throws org.apache.thrift.TException;
+
+ public void angularObjectUpdate(String name, String object, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.angularObjectUpdate_call> resultHandler) throws org.apache.thrift.TException;
+
}
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
@@ -321,6 +329,49 @@ public class RemoteInterpreterService {
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getStatus failed: unknown result");
}
+ public RemoteInterpreterEvent getEvent() throws org.apache.thrift.TException
+ {
+ send_getEvent();
+ return recv_getEvent();
+ }
+
+ public void send_getEvent() throws org.apache.thrift.TException
+ {
+ getEvent_args args = new getEvent_args();
+ sendBase("getEvent", args);
+ }
+
+ public RemoteInterpreterEvent recv_getEvent() throws org.apache.thrift.TException
+ {
+ getEvent_result result = new getEvent_result();
+ receiveBase(result, "getEvent");
+ if (result.isSetSuccess()) {
+ return result.success;
+ }
+ throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getEvent failed: unknown result");
+ }
+
+ public void angularObjectUpdate(String name, String object) throws org.apache.thrift.TException
+ {
+ send_angularObjectUpdate(name, object);
+ recv_angularObjectUpdate();
+ }
+
+ public void send_angularObjectUpdate(String name, String object) throws org.apache.thrift.TException
+ {
+ angularObjectUpdate_args args = new angularObjectUpdate_args();
+ args.setName(name);
+ args.setObject(object);
+ sendBase("angularObjectUpdate", args);
+ }
+
+ public void recv_angularObjectUpdate() throws org.apache.thrift.TException
+ {
+ angularObjectUpdate_result result = new angularObjectUpdate_result();
+ receiveBase(result, "angularObjectUpdate");
+ return;
+ }
+
}
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -677,6 +728,70 @@ public class RemoteInterpreterService {
}
}
+ public void getEvent(org.apache.thrift.async.AsyncMethodCallback<getEvent_call> resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ getEvent_call method_call = new getEvent_call(resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class getEvent_call extends org.apache.thrift.async.TAsyncMethodCall {
+ public getEvent_call(org.apache.thrift.async.AsyncMethodCallback<getEvent_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getEvent", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ getEvent_args args = new getEvent_args();
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public RemoteInterpreterEvent getResult() throws org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ return (new Client(prot)).recv_getEvent();
+ }
+ }
+
+ public void angularObjectUpdate(String name, String object, org.apache.thrift.async.AsyncMethodCallback<angularObjectUpdate_call> resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ angularObjectUpdate_call method_call = new angularObjectUpdate_call(name, object, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class angularObjectUpdate_call extends org.apache.thrift.async.TAsyncMethodCall {
+ private String name;
+ private String object;
+ public angularObjectUpdate_call(String name, String object, org.apache.thrift.async.AsyncMethodCallback<angularObjectUpdate_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.name = name;
+ this.object = object;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("angularObjectUpdate", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ angularObjectUpdate_args args = new angularObjectUpdate_args();
+ args.setName(name);
+ args.setObject(object);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_angularObjectUpdate();
+ }
+ }
+
}
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -700,6 +815,8 @@ public class RemoteInterpreterService {
processMap.put("completion", new completion());
processMap.put("shutdown", new shutdown());
processMap.put("getStatus", new getStatus());
+ processMap.put("getEvent", new getEvent());
+ processMap.put("angularObjectUpdate", new angularObjectUpdate());
return processMap;
}
@@ -904,6 +1021,46 @@ public class RemoteInterpreterService {
}
}
+ public static class getEvent<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getEvent_args> {
+ public getEvent() {
+ super("getEvent");
+ }
+
+ public getEvent_args getEmptyArgsInstance() {
+ return new getEvent_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public getEvent_result getResult(I iface, getEvent_args args) throws org.apache.thrift.TException {
+ getEvent_result result = new getEvent_result();
+ result.success = iface.getEvent();
+ return result;
+ }
+ }
+
+ public static class angularObjectUpdate<I extends Iface> extends org.apache.thrift.ProcessFunction<I, angularObjectUpdate_args> {
+ public angularObjectUpdate() {
+ super("angularObjectUpdate");
+ }
+
+ public angularObjectUpdate_args getEmptyArgsInstance() {
+ return new angularObjectUpdate_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public angularObjectUpdate_result getResult(I iface, angularObjectUpdate_args args) throws org.apache.thrift.TException {
+ angularObjectUpdate_result result = new angularObjectUpdate_result();
+ iface.angularObjectUpdate(args.name, args.object);
+ return result;
+ }
+ }
+
}
public static class createInterpreter_args implements org.apache.thrift.TBase<createInterpreter_args, createInterpreter_args._Fields>, java.io.Serializable, Cloneable {
@@ -8171,4 +8328,1309 @@ public class RemoteInterpreterService {
}
+ public static class getEvent_args implements org.apache.thrift.TBase<getEvent_args, getEvent_args._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getEvent_args");
+
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new getEvent_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new getEvent_argsTupleSchemeFactory());
+ }
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getEvent_args.class, metaDataMap);
+ }
+
+ public getEvent_args() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public getEvent_args(getEvent_args other) {
+ }
+
+ public getEvent_args deepCopy() {
+ return new getEvent_args(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof getEvent_args)
+ return this.equals((getEvent_args)that);
+ return false;
+ }
+
+ public boolean equals(getEvent_args that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(getEvent_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ getEvent_args typedOther = (getEvent_args)other;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("getEvent_args(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class getEvent_argsStandardSchemeFactory implements SchemeFactory {
+ public getEvent_argsStandardScheme getScheme() {
+ return new getEvent_argsStandardScheme();
+ }
+ }
+
+ private static class getEvent_argsStandardScheme extends StandardScheme<getEvent_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, getEvent_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, getEvent_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class getEvent_argsTupleSchemeFactory implements SchemeFactory {
+ public getEvent_argsTupleScheme getScheme() {
+ return new getEvent_argsTupleScheme();
+ }
+ }
+
+ private static class getEvent_argsTupleScheme extends TupleScheme<getEvent_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, getEvent_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, getEvent_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
+ }
+
+ public static class getEvent_result implements org.apache.thrift.TBase<getEvent_result, getEvent_result._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getEvent_result");
+
+ private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new getEvent_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new getEvent_resultTupleSchemeFactory());
+ }
+
+ public RemoteInterpreterEvent success; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ SUCCESS((short)0, "success");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 0: // SUCCESS
+ return SUCCESS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, RemoteInterpreterEvent.class)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getEvent_result.class, metaDataMap);
+ }
+
+ public getEvent_result() {
+ }
+
+ public getEvent_result(
+ RemoteInterpreterEvent success)
+ {
+ this();
+ this.success = success;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public getEvent_result(getEvent_result other) {
+ if (other.isSetSuccess()) {
+ this.success = new RemoteInterpreterEvent(other.success);
+ }
+ }
+
+ public getEvent_result deepCopy() {
+ return new getEvent_result(this);
+ }
+
+ @Override
+ public void clear() {
+ this.success = null;
+ }
+
+ public RemoteInterpreterEvent getSuccess() {
+ return this.success;
+ }
+
+ public getEvent_result setSuccess(RemoteInterpreterEvent success) {
+ this.success = success;
+ return this;
+ }
+
+ public void unsetSuccess() {
+ this.success = null;
+ }
+
+ /** Returns true if field success is set (has been assigned a value) and false otherwise */
+ public boolean isSetSuccess() {
+ return this.success != null;
+ }
+
+ public void setSuccessIsSet(boolean value) {
+ if (!value) {
+ this.success = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case SUCCESS:
+ if (value == null) {
+ unsetSuccess();
+ } else {
+ setSuccess((RemoteInterpreterEvent)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return getSuccess();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case SUCCESS:
+ return isSetSuccess();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof getEvent_result)
+ return this.equals((getEvent_result)that);
+ return false;
+ }
+
+ public boolean equals(getEvent_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_success = true && this.isSetSuccess();
+ boolean that_present_success = true && that.isSetSuccess();
+ if (this_present_success || that_present_success) {
+ if (!(this_present_success && that_present_success))
+ return false;
+ if (!this.success.equals(that.success))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(getEvent_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ getEvent_result typedOther = (getEvent_result)other;
+
+ lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSuccess()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("getEvent_result(");
+ boolean first = true;
+
+ sb.append("success:");
+ if (this.success == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.success);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ if (success != null) {
+ success.validate();
+ }
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class getEvent_resultStandardSchemeFactory implements SchemeFactory {
+ public getEvent_resultStandardScheme getScheme() {
+ return new getEvent_resultStandardScheme();
+ }
+ }
+
+ private static class getEvent_resultStandardScheme extends StandardScheme<getEvent_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, getEvent_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 0: // SUCCESS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.success = new RemoteInterpreterEvent();
+ struct.success.read(iprot);
+ struct.setSuccessIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, getEvent_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.success != null) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ struct.success.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class getEvent_resultTupleSchemeFactory implements SchemeFactory {
+ public getEvent_resultTupleScheme getScheme() {
+ return new getEvent_resultTupleScheme();
+ }
+ }
+
+ private static class getEvent_resultTupleScheme extends TupleScheme<getEvent_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, getEvent_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetSuccess()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetSuccess()) {
+ struct.success.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, getEvent_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.success = new RemoteInterpreterEvent();
+ struct.success.read(iprot);
+ struct.setSuccessIsSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class angularObjectUpdate_args implements org.apache.thrift.TBase<angularObjectUpdate_args, angularObjectUpdate_args._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("angularObjectUpdate_args");
+
+ private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField OBJECT_FIELD_DESC = new org.apache.thrift.protocol.TField("object", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new angularObjectUpdate_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new angularObjectUpdate_argsTupleSchemeFactory());
+ }
+
+ public String name; // required
+ public String object; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ NAME((short)1, "name"),
+ OBJECT((short)2, "object");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // NAME
+ return NAME;
+ case 2: // OBJECT
+ return OBJECT;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.OBJECT, new org.apache.thrift.meta_data.FieldMetaData("object", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularObjectUpdate_args.class, metaDataMap);
+ }
+
+ public angularObjectUpdate_args() {
+ }
+
+ public angularObjectUpdate_args(
+ String name,
+ String object)
+ {
+ this();
+ this.name = name;
+ this.object = object;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public angularObjectUpdate_args(angularObjectUpdate_args other) {
+ if (other.isSetName()) {
+ this.name = other.name;
+ }
+ if (other.isSetObject()) {
+ this.object = other.object;
+ }
+ }
+
+ public angularObjectUpdate_args deepCopy() {
+ return new angularObjectUpdate_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.name = null;
+ this.object = null;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public angularObjectUpdate_args setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public void unsetName() {
+ this.name = null;
+ }
+
+ /** Returns true if field name is set (has been assigned a value) and false otherwise */
+ public boolean isSetName() {
+ return this.name != null;
+ }
+
+ public void setNameIsSet(boolean value) {
+ if (!value) {
+ this.name = null;
+ }
+ }
+
+ public String getObject() {
+ return this.object;
+ }
+
+ public angularObjectUpdate_args setObject(String object) {
+ this.object = object;
+ return this;
+ }
+
+ public void unsetObject() {
+ this.object = null;
+ }
+
+ /** Returns true if field object is set (has been assigned a value) and false otherwise */
+ public boolean isSetObject() {
+ return this.object != null;
+ }
+
+ public void setObjectIsSet(boolean value) {
+ if (!value) {
+ this.object = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case NAME:
+ if (value == null) {
+ unsetName();
+ } else {
+ setName((String)value);
+ }
+ break;
+
+ case OBJECT:
+ if (value == null) {
+ unsetObject();
+ } else {
+ setObject((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case NAME:
+ return getName();
+
+ case OBJECT:
+ return getObject();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case NAME:
+ return isSetName();
+ case OBJECT:
+ return isSetObject();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof angularObjectUpdate_args)
+ return this.equals((angularObjectUpdate_args)that);
+ return false;
+ }
+
+ public boolean equals(angularObjectUpdate_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_name = true && this.isSetName();
+ boolean that_present_name = true && that.isSetName();
+ if (this_present_name || that_present_name) {
+ if (!(this_present_name && that_present_name))
+ return false;
+ if (!this.name.equals(that.name))
+ return false;
+ }
+
+ boolean this_present_object = true && this.isSetObject();
+ boolean that_present_object = true && that.isSetObject();
+ if (this_present_object || that_present_object) {
+ if (!(this_present_object && that_present_object))
+ return false;
+ if (!this.object.equals(that.object))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(angularObjectUpdate_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ angularObjectUpdate_args typedOther = (angularObjectUpdate_args)other;
+
+ lastComparison = Boolean.valueOf(isSetName()).compareTo(typedOther.isSetName());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetName()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, typedOther.name);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetObject()).compareTo(typedOther.isSetObject());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetObject()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.object, typedOther.object);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("angularObjectUpdate_args(");
+ boolean first = true;
+
+ sb.append("name:");
+ if (this.name == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.name);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("object:");
+ if (this.object == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.object);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class angularObjectUpdate_argsStandardSchemeFactory implements SchemeFactory {
+ public angularObjectUpdate_argsStandardScheme getScheme() {
+ return new angularObjectUpdate_argsStandardScheme();
+ }
+ }
+
+ private static class angularObjectUpdate_argsStandardScheme extends StandardScheme<angularObjectUpdate_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, angularObjectUpdate_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // NAME
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.name = iprot.readString();
+ struct.setNameIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // OBJECT
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.object = iprot.readString();
+ struct.setObjectIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, angularObjectUpdate_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.name != null) {
+ oprot.writeFieldBegin(NAME_FIELD_DESC);
+ oprot.writeString(struct.name);
+ oprot.writeFieldEnd();
+ }
+ if (struct.object != null) {
+ oprot.writeFieldBegin(OBJECT_FIELD_DESC);
+ oprot.writeString(struct.object);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class angularObjectUpdate_argsTupleSchemeFactory implements SchemeFactory {
+ public angularObjectUpdate_argsTupleScheme getScheme() {
+ return new angularObjectUpdate_argsTupleScheme();
+ }
+ }
+
+ private static class angularObjectUpdate_argsTupleScheme extends TupleScheme<angularObjectUpdate_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, angularObjectUpdate_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetName()) {
+ optionals.set(0);
+ }
+ if (struct.isSetObject()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetName()) {
+ oprot.writeString(struct.name);
+ }
+ if (struct.isSetObject()) {
+ oprot.writeString(struct.object);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, angularObjectUpdate_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.name = iprot.readString();
+ struct.setNameIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.object = iprot.readString();
+ struct.setObjectIsSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class angularObjectUpdate_result implements org.apache.thrift.TBase<angularObjectUpdate_result, angularObjectUpdate_result._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("angularObjectUpdate_result");
+
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new angularObjectUpdate_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new angularObjectUpdate_resultTupleSchemeFactory());
+ }
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularObjectUpdate_result.class, metaDataMap);
+ }
+
+ public angularObjectUpdate_result() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public angularObjectUpdate_result(angularObjectUpdate_result other) {
+ }
+
+ public angularObjectUpdate_result deepCopy() {
+ return new angularObjectUpdate_result(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof angularObjectUpdate_result)
+ return this.equals((angularObjectUpdate_result)that);
+ return false;
+ }
+
+ public boolean equals(angularObjectUpdate_result that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(angularObjectUpdate_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ angularObjectUpdate_result typedOther = (angularObjectUpdate_result)other;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("angularObjectUpdate_result(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class angularObjectUpdate_resultStandardSchemeFactory implements SchemeFactory {
+ public angularObjectUpdate_resultStandardScheme getScheme() {
+ return new angularObjectUpdate_resultStandardScheme();
+ }
+ }
+
+ private static class angularObjectUpdate_resultStandardScheme extends StandardScheme<angularObjectUpdate_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, angularObjectUpdate_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, angularObjectUpdate_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class angularObjectUpdate_resultTupleSchemeFactory implements SchemeFactory {
+ public angularObjectUpdate_resultTupleScheme getScheme() {
+ return new angularObjectUpdate_resultTupleScheme();
+ }
+ }
+
+ private static class angularObjectUpdate_resultTupleScheme extends TupleScheme<angularObjectUpdate_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, angularObjectUpdate_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, angularObjectUpdate_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
+ }
+
}