You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2016/06/29 09:45:50 UTC

zeppelin git commit: ZEPPELIN-1067 Change ordering mechanism for interpreters

Repository: zeppelin
Updated Branches:
  refs/heads/master d4e3d8cf0 -> 2a363cd6d


ZEPPELIN-1067 Change ordering mechanism for interpreters

### What is this PR for?
Removing dependency of interpreterClassList in order to add custom interpreter easily. For now, we should add some configuration for loading custom interpreter. This will enable users to add custom interpreter without changing interpreterClassList

### What type of PR is it?
[Improvement]

### Todos
* [x] - Remove dependency of interprterClassList

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1067

### How should this be tested?
That shouldn't change any thing on user level.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jongyoul Lee <jo...@gmail.com>

Closes #1095 from jongyoul/ZEPPELIN-1067 and squashes the following commits:

2848e63 [Jongyoul Lee] Handled in case that two interpreter setting have same interpreter group but different interpreter name
7915311 [Jongyoul Lee] Fixed style
78a213b [Jongyoul Lee] Adjusted some interpreter groups having multiple interpreters
0d0b4c5 [Jongyoul Lee] Fixed NoteInterpreterTest not to use an option for ordering group
05ad9bb [Jongyoul Lee] Added interpreter group not in a interpret_group_order
79da385 [Jongyoul Lee] Added new configuration for ordering group Removed codes using interpreterClassList Added option to set default interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2a363cd6
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2a363cd6
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2a363cd6

Branch: refs/heads/master
Commit: 2a363cd6d88a59bc9016fe3442878c3a64c6eade
Parents: d4e3d8c
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Tue Jun 28 00:21:31 2016 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Wed Jun 29 18:45:36 2016 +0900

----------------------------------------------------------------------
 conf/zeppelin-site.xml.template                 |   6 +
 .../zeppelin/ignite/IgniteInterpreter.java      |   1 +
 .../src/main/resources/interpreter-setting.json |   1 +
 .../src/main/resources/interpreter-setting.json |   1 +
 .../zeppelin/interpreter/Interpreter.java       |  32 ++-
 .../zeppelin/conf/ZeppelinConfiguration.java    |   3 +
 .../interpreter/InterpreterFactory.java         | 198 ++++++++++---------
 .../notebook/NoteInterpreterLoaderTest.java     |   3 +-
 8 files changed, 144 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 6c1ff8a..3fc1b4a 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -183,6 +183,12 @@
 </property>
 
 <property>
+  <name>zeppelin.interpreter.group.order</name>
+  <value>"spark,md,angular,sh,livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,scalding,jdbc,hbase</value>
+  <description></description>
+</property>
+
+<property>
   <name>zeppelin.interpreter.connect.timeout</name>
   <value>30000</value>
   <description>Interpreter process connect timeout in msec.</description>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java
----------------------------------------------------------------------
diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java
index 8368195..fa5a079 100644
--- a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java
+++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java
@@ -80,6 +80,7 @@ public class IgniteInterpreter extends Interpreter {
             "ignite",
             "ignite",
             IgniteInterpreter.class.getName(),
+            true,
             new InterpreterPropertyBuilder()
                     .add(IGNITE_ADDRESSES, "127.0.0.1:47500..47509",
                             "Coma separated list of addresses "

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json
index 7ae435f..232bcb6 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -3,6 +3,7 @@
     "group": "livy",
     "name": "spark",
     "className": "org.apache.zeppelin.livy.LivySparkInterpreter",
+    "defaultInterpreter": true,
     "properties": {
       "zeppelin.livy.url": {
         "envName": "ZEPPELIN_LIVY_HOST_URL",

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/spark/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json
index d46801b..4902baf 100644
--- a/spark/src/main/resources/interpreter-setting.json
+++ b/spark/src/main/resources/interpreter-setting.json
@@ -3,6 +3,7 @@
     "group": "spark",
     "name": "spark",
     "className": "org.apache.zeppelin.spark.SparkInterpreter",
+    "defaultInterpreter": true,
     "properties": {
       "spark.executor.memory": {
         "envName": null,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
index c52bb4a..82e1f2b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -249,15 +249,22 @@ public abstract class Interpreter {
     private String name;
     //@SerializedName("interpreterClassName")
     private String className;
+    private boolean defaultInterpreter;
     private Map<String, InterpreterProperty> properties;
     private String path;
 
     public RegisteredInterpreter(String name, String group, String className,
         Map<String, InterpreterProperty> properties) {
+      this(name, group, className, false, properties);
+    }
+
+    public RegisteredInterpreter(String name, String group, String className,
+        boolean defaultInterpreter, Map<String, InterpreterProperty> properties) {
       super();
       this.name = name;
       this.group = group;
       this.className = className;
+      this.defaultInterpreter = defaultInterpreter;
       this.properties = properties;
     }
 
@@ -273,6 +280,14 @@ public abstract class Interpreter {
       return className;
     }
 
+    public boolean isDefaultInterpreter() {
+      return defaultInterpreter;
+    }
+
+    public void setDefaultInterpreter(boolean defaultInterpreter) {
+      this.defaultInterpreter = defaultInterpreter;
+    }
+
     public Map<String, InterpreterProperty> getProperties() {
       return properties;
     }
@@ -306,16 +321,27 @@ public abstract class Interpreter {
   }
 
   public static void register(String name, String group, String className) {
-    register(name, group, className, new HashMap<String, InterpreterProperty>());
+    register(name, group, className, false, new HashMap<String, InterpreterProperty>());
+  }
+
+  public static void register(String name, String group, String className,
+      Map<String, InterpreterProperty> properties) {
+    register(name, group, className, false, properties);
+  }
+
+  public static void register(String name, String group, String className,
+      boolean defaultInterpreter) {
+    register(name, group, className, defaultInterpreter,
+        new HashMap<String, InterpreterProperty>());
   }
 
   @Deprecated
   public static void register(String name, String group, String className,
-                              Map<String, InterpreterProperty> properties) {
+      boolean defaultInterpreter, Map<String, InterpreterProperty> properties) {
     logger.error("Static initialization is deprecated. You should change it to use " +
                      "interpreter-setting.json in your jar or " +
                      "interpreter/{interpreter}/interpreter-setting.json");
-    register(new RegisteredInterpreter(name, group, className, properties));
+    register(new RegisteredInterpreter(name, group, className, defaultInterpreter, properties));
   }
 
   public static void register(RegisteredInterpreter registeredInterpreter) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 0a7b8c0..57de4a1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -515,6 +515,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
     ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
     ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
+    ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
+        + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+        + "scalding,jdbc,hbase"),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
     ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
     // use specified notebook (id) as homescreen

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index aeb7818..ca9e471 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -66,7 +66,9 @@ public class InterpreterFactory implements InterpreterGroupFactory {
       .synchronizedMap(new HashMap<String, URLClassLoader>());
 
   private ZeppelinConfiguration conf;
+  @Deprecated
   String[] interpreterClassList;
+  String[] interpreterGroupOrderList;
 
   private Map<String, InterpreterSetting> interpreterSettings =
       new HashMap<String, InterpreterSetting>();
@@ -106,6 +108,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
     this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
     String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
     interpreterClassList = replsConf.split(",");
+    String groupOrder = conf.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER);
+    interpreterGroupOrderList = groupOrder.split(",");
 
     GsonBuilder builder = new GsonBuilder();
     builder.setPrettyPrinting();
@@ -178,37 +182,42 @@ public class InterpreterFactory implements InterpreterGroupFactory {
 
         for (String k : Interpreter.registeredInterpreters.keySet()) {
           RegisteredInterpreter info = Interpreter.registeredInterpreters.get(k);
+          String group = info.getGroup();
 
-          if (!groupClassNameMap.containsKey(info.getGroup())) {
-            groupClassNameMap.put(info.getGroup(), new LinkedList<RegisteredInterpreter>());
+          if (!groupClassNameMap.containsKey(group)) {
+            groupClassNameMap.put(group, new LinkedList<RegisteredInterpreter>());
+            groupClassNameMap.get(group).add(info);
+          } else {
+            if (info.isDefaultInterpreter()) {
+              groupClassNameMap.get(group).add(0, info);
+            } else {
+              groupClassNameMap.get(group).add(info);
+            }
           }
-
-          groupClassNameMap.get(info.getGroup()).add(info);
         }
 
-        for (String className : interpreterClassList) {
-          for (String groupName : groupClassNameMap.keySet()) {
-            List<RegisteredInterpreter> infos = groupClassNameMap.get(groupName);
-
-            boolean found = false;
+        for (String groupName : interpreterGroupOrderList) {
+          List<RegisteredInterpreter> infos = groupClassNameMap.remove(groupName);
+          if (null != infos) {
             Properties p = new Properties();
             for (RegisteredInterpreter info : infos) {
-              if (found == false && info.getClassName().equals(className)) {
-                found = true;
-              }
-
-              for (String k : info.getProperties().keySet()) {
-                p.put(k, info.getProperties().get(k).getValue());
+              for (String key : info.getProperties().keySet()) {
+                p.put(key, info.getProperties().get(key).getValue());
               }
             }
+            add(groupName, groupName, new LinkedList<Dependency>(), defaultOption, p);
+          }
+        }
 
-            if (found) {
-              // add all interpreters in group
-              add(groupName, groupName, new LinkedList<Dependency>(), defaultOption, p);
-              groupClassNameMap.remove(groupName);
-              break;
+        for (String groupName : groupClassNameMap.keySet()) {
+          List<RegisteredInterpreter> infos = groupClassNameMap.get(groupName);
+          Properties p = new Properties();
+          for (RegisteredInterpreter info : infos) {
+            for (String key : info.getProperties().keySet()) {
+              p.put(key, info.getProperties().get(key).getValue());
             }
           }
+          add(groupName, groupName, new LinkedList<Dependency>(), defaultOption, p);
         }
       }
     }
@@ -430,15 +439,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
   }
 
   public List<RegisteredInterpreter> getRegisteredInterpreterList() {
-    List<RegisteredInterpreter> registeredInterpreters = new LinkedList<RegisteredInterpreter>();
-
-    for (String className : interpreterClassList) {
-      RegisteredInterpreter ri = Interpreter.findRegisteredInterpreterByClassName(className);
-      if (ri != null) {
-        registeredInterpreters.add(ri);
-      }
-    }
-    return registeredInterpreters;
+    return new ArrayList<>(Interpreter.registeredInterpreters.values());
   }
 
   /**
@@ -454,19 +455,18 @@ public class InterpreterFactory implements InterpreterGroupFactory {
       InterpreterOption option, Properties properties)
       throws InterpreterException, IOException, RepositoryException {
     synchronized (interpreterSettings) {
-
-      List<InterpreterSetting.InterpreterInfo> interpreterInfos =
-          new LinkedList<InterpreterSetting.InterpreterInfo>();
-
-      for (String className : interpreterClassList) {
-        for (RegisteredInterpreter registeredInterpreter :
-            Interpreter.registeredInterpreters.values()) {
-          if (registeredInterpreter.getGroup().equals(groupName)) {
-            if (registeredInterpreter.getClassName().equals(className)) {
-              interpreterInfos.add(
-                  new InterpreterSetting.InterpreterInfo(
-                      className, registeredInterpreter.getName()));
-            }
+      List<InterpreterSetting.InterpreterInfo> interpreterInfos = new ArrayList<>();
+
+      for (RegisteredInterpreter registeredInterpreter :
+          Interpreter.registeredInterpreters.values()) {
+        if (registeredInterpreter.getGroup().equals(groupName)) {
+          if (registeredInterpreter.isDefaultInterpreter()) {
+            interpreterInfos.add(0,
+                new InterpreterSetting.InterpreterInfo(
+                    registeredInterpreter.getClassName(), registeredInterpreter.getName()));
+          } else {
+            interpreterInfos.add(new InterpreterSetting.InterpreterInfo(
+                registeredInterpreter.getClassName(), registeredInterpreter.getName()));
           }
         }
       }
@@ -575,38 +575,38 @@ public class InterpreterFactory implements InterpreterGroupFactory {
 
     logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId);
 
-    for (String className : interpreterClassList) {
-      Set<String> keys = Interpreter.registeredInterpreters.keySet();
-      for (String intName : keys) {
-        RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
-        if (info.getClassName().equals(className)
-            && info.getGroup().equals(groupName)) {
-          Interpreter intp;
-
-          if (option.isRemote()) {
-            intp = createRemoteRepl(info.getPath(),
-                key,
-                info.getClassName(),
-                properties,
-                interpreterSetting.id());
-          } else {
-            intp = createRepl(info.getPath(),
-                info.getClassName(),
-                properties);
-          }
+    Set<String> keys = Interpreter.registeredInterpreters.keySet();
+    for (String intName : keys) {
+      RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
+      if (info.getGroup().equals(groupName)) {
+        Interpreter intp;
+
+        if (option.isRemote()) {
+          intp = createRemoteRepl(info.getPath(),
+              key,
+              info.getClassName(),
+              properties,
+              interpreterSetting.id());
+        } else {
+          intp = createRepl(info.getPath(),
+              info.getClassName(),
+              properties);
+        }
 
-          synchronized (interpreterGroup) {
-            List<Interpreter> interpreters = interpreterGroup.get(key);
-            if (interpreters == null) {
-              interpreters = new LinkedList<Interpreter>();
-              interpreterGroup.put(key, interpreters);
-            }
+        synchronized (interpreterGroup) {
+          List<Interpreter> interpreters = interpreterGroup.get(key);
+          if (interpreters == null) {
+            interpreters = new LinkedList<Interpreter>();
+            interpreterGroup.put(key, interpreters);
+          }
+          if (info.isDefaultInterpreter()) {
+            interpreters.add(0, intp);
+          } else {
             interpreters.add(intp);
           }
-          logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created");
-          intp.setInterpreterGroup(interpreterGroup);
-          break;
         }
+        logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created");
+        intp.setInterpreterGroup(interpreterGroup);
       }
     }
   }
@@ -644,38 +644,44 @@ public class InterpreterFactory implements InterpreterGroupFactory {
   public List<InterpreterSetting> get() {
     synchronized (interpreterSettings) {
       List<InterpreterSetting> orderedSettings = new LinkedList<InterpreterSetting>();
-      List<InterpreterSetting> settings = new LinkedList<InterpreterSetting>(
-          interpreterSettings.values());
-      Collections.sort(settings, new Comparator<InterpreterSetting>(){
+
+      Map<String, List<InterpreterSetting>> groupNameInterpreterSettingMap = new HashMap<>();
+      for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+        String groupName = interpreterSetting.getGroup();
+        if (!groupNameInterpreterSettingMap.containsKey(groupName)) {
+          groupNameInterpreterSettingMap.put(groupName, new ArrayList<InterpreterSetting>());
+        }
+        groupNameInterpreterSettingMap.get(groupName).add(interpreterSetting);
+      }
+
+      for (String groupName : interpreterGroupOrderList) {
+        List<InterpreterSetting> interpreterSettingList =
+            groupNameInterpreterSettingMap.remove(groupName);
+        if (null != interpreterSettingList) {
+          for (InterpreterSetting interpreterSetting : interpreterSettingList) {
+            orderedSettings.add(interpreterSetting);
+          }
+        }
+      }
+
+      List<InterpreterSetting> settings = new ArrayList<>();
+
+      for (List<InterpreterSetting> interpreterSettingList :
+          groupNameInterpreterSettingMap.values()) {
+        for (InterpreterSetting interpreterSetting : interpreterSettingList) {
+          settings.add(interpreterSetting);
+        }
+      }
+
+      Collections.sort(settings, new Comparator<InterpreterSetting>() {
         @Override
         public int compare(InterpreterSetting o1, InterpreterSetting o2) {
           return o1.getName().compareTo(o2.getName());
         }
       });
 
-      for (String className : interpreterClassList) {
-        for (InterpreterSetting setting : settings) {
-          for (InterpreterSetting orderedSetting : orderedSettings) {
-            if (orderedSetting.id().equals(setting.id())) {
-              continue;
-            }
-          }
-          for (InterpreterSetting.InterpreterInfo intp : setting.getInterpreterInfos()) {
-
-            if (className.equals(intp.getClassName())) {
-              boolean alreadyAdded = false;
-              for (InterpreterSetting st : orderedSettings) {
-                if (setting.id().equals(st.id())) {
-                  alreadyAdded = true;
-                }
-              }
-              if (alreadyAdded == false) {
-                orderedSettings.add(setting);
-              }
-            }
-          }
-        }
-      }
+      orderedSettings.addAll(settings);
+
       return orderedSettings;
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a363cd6/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
index f6abc9d..147d431 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
@@ -50,13 +50,12 @@ public class NoteInterpreterLoaderTest {
     new File(tmpDir, "conf").mkdirs();
 
     System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
-    System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter11,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
 
     conf = ZeppelinConfiguration.create();
 
     Interpreter.registeredInterpreters = Collections
         .synchronizedMap(new HashMap<String, Interpreter.RegisteredInterpreter>());
-    MockInterpreter1.register("mock1", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
+    MockInterpreter1.register("mock1", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1", true);
     MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11");
     MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");