You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/01/13 04:15:42 UTC
[1/2] incubator-zeppelin git commit: ZEPPELIN-546 Enables interpreter
library loading from maven repository
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master 0c42f4332 -> bc7155114
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java
new file mode 100644
index 0000000..ba8ee16
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.RepositorySystemSession;
+import org.sonatype.aether.repository.RemoteRepository;
+import org.sonatype.aether.resolution.ArtifactResult;
+
+/**
+ * Abstract dependency resolver.
+ * Add new dependencies from mvn repo (at runtime) Zeppelin.
+ */
+public abstract class AbstractDependencyResolver {
+ protected RepositorySystem system = Booter.newRepositorySystem();
+ protected List<RemoteRepository> repos = new LinkedList<RemoteRepository>();
+ protected RepositorySystemSession session;
+
+ public AbstractDependencyResolver(String localRepoPath) {
+ session = Booter.newRepositorySystemSession(system, localRepoPath);
+ repos.add(Booter.newCentralRepository()); // add maven central
+ repos.add(Booter.newLocalRepository());
+ }
+
+ public void addRepo(String id, String url, boolean snapshot) {
+ synchronized (repos) {
+ delRepo(id);
+ RemoteRepository rr = new RemoteRepository(id, "default", url);
+ rr.setPolicy(snapshot, null);
+ repos.add(rr);
+ }
+ }
+
+ public RemoteRepository delRepo(String id) {
+ synchronized (repos) {
+ Iterator<RemoteRepository> it = repos.iterator();
+ if (it.hasNext()) {
+ RemoteRepository repo = it.next();
+ if (repo.getId().equals(id)) {
+ it.remove();
+ return repo;
+ }
+ }
+ }
+ return null;
+ }
+
+ public abstract List<ArtifactResult> getArtifactsWithDep(String dependency,
+ Collection<String> excludes) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java
new file mode 100644
index 0000000..7a487fa
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import java.io.File;
+
+import org.apache.maven.repository.internal.MavenRepositorySystemSession;
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.RepositorySystemSession;
+import org.sonatype.aether.repository.LocalRepository;
+import org.sonatype.aether.repository.RemoteRepository;
+
+/**
+ * Manage mvn repository.
+ */
+public class Booter {
+ public static RepositorySystem newRepositorySystem() {
+ return RepositorySystemFactory.newRepositorySystem();
+ }
+
+ public static RepositorySystemSession newRepositorySystemSession(
+ RepositorySystem system, String localRepoPath) {
+ MavenRepositorySystemSession session = new MavenRepositorySystemSession();
+
+ // find homedir
+ String home = System.getenv("ZEPPELIN_HOME");
+ if (home == null) {
+ home = System.getProperty("zeppelin.home");
+ }
+ if (home == null) {
+ home = "..";
+ }
+
+ String path = home + "/" + localRepoPath;
+
+ LocalRepository localRepo =
+ new LocalRepository(new File(path).getAbsolutePath());
+ session.setLocalRepositoryManager(system.newLocalRepositoryManager(localRepo));
+
+ // session.setTransferListener(new ConsoleTransferListener());
+ // session.setRepositoryListener(new ConsoleRepositoryListener());
+
+ // uncomment to generate dirty trees
+ // session.setDependencyGraphTransformer( null );
+
+ return session;
+ }
+
+ public static RemoteRepository newCentralRepository() {
+ return new RemoteRepository("central", "default", "http://repo1.maven.org/maven2/");
+ }
+
+ public static RemoteRepository newLocalRepository() {
+ return new RemoteRepository("local",
+ "default", "file://" + System.getProperty("user.home") + "/.m2/repository");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Dependency.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Dependency.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Dependency.java
new file mode 100644
index 0000000..8f77de4
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Dependency.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *
+ */
+public class Dependency {
+ private String groupArtifactVersion;
+ private boolean local = false;
+ private List<String> exclusions;
+
+
+ public Dependency(String groupArtifactVersion) {
+ this.groupArtifactVersion = groupArtifactVersion;
+ exclusions = new LinkedList<String>();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Dependency)) {
+ return false;
+ } else {
+ return ((Dependency) o).groupArtifactVersion.equals(groupArtifactVersion);
+ }
+ }
+
+ /**
+ * Don't add artifact into SparkContext (sc.addJar())
+ * @return
+ */
+ public Dependency local() {
+ local = true;
+ return this;
+ }
+
+ public Dependency excludeAll() {
+ exclude("*");
+ return this;
+ }
+
+ /**
+ *
+ * @param exclusions comma or newline separated list of "groupId:ArtifactId"
+ * @return
+ */
+ public Dependency exclude(String exclusions) {
+ for (String item : exclusions.split(",|\n")) {
+ this.exclusions.add(item);
+ }
+
+ return this;
+ }
+
+
+ public String getGroupArtifactVersion() {
+ return groupArtifactVersion;
+ }
+
+ public boolean isDist() {
+ return !local;
+ }
+
+ public List<String> getExclusions() {
+ return exclusions;
+ }
+
+ public boolean isLocalFsArtifact() {
+ int numSplits = groupArtifactVersion.split(":").length;
+ return !(numSplits >= 3 && numSplits <= 6);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyContext.java
new file mode 100644
index 0000000..ab4da28
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyContext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.RepositorySystemSession;
+import org.sonatype.aether.artifact.Artifact;
+import org.sonatype.aether.collection.CollectRequest;
+import org.sonatype.aether.graph.DependencyFilter;
+import org.sonatype.aether.repository.RemoteRepository;
+import org.sonatype.aether.resolution.ArtifactResolutionException;
+import org.sonatype.aether.resolution.ArtifactResult;
+import org.sonatype.aether.resolution.DependencyRequest;
+import org.sonatype.aether.resolution.DependencyResolutionException;
+import org.sonatype.aether.util.artifact.DefaultArtifact;
+import org.sonatype.aether.util.artifact.JavaScopes;
+import org.sonatype.aether.util.filter.DependencyFilterUtils;
+import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
+
+
+/**
+ *
+ */
+public class DependencyContext {
+ List<Dependency> dependencies = new LinkedList<Dependency>();
+ List<Repository> repositories = new LinkedList<Repository>();
+
+ List<File> files = new LinkedList<File>();
+ List<File> filesDist = new LinkedList<File>();
+ private RepositorySystem system = Booter.newRepositorySystem();
+ private RepositorySystemSession session;
+ private RemoteRepository mavenCentral = Booter.newCentralRepository();
+ private RemoteRepository mavenLocal = Booter.newLocalRepository();
+
+ public DependencyContext(String localRepoPath) {
+ session = Booter.newRepositorySystemSession(system, localRepoPath);
+ }
+
+ public Dependency load(String lib) {
+ Dependency dep = new Dependency(lib);
+
+ if (dependencies.contains(dep)) {
+ dependencies.remove(dep);
+ }
+ dependencies.add(dep);
+ return dep;
+ }
+
+ public Repository addRepo(String name) {
+ Repository rep = new Repository(name);
+ repositories.add(rep);
+ return rep;
+ }
+
+ public void reset() {
+ dependencies = new LinkedList<Dependency>();
+ repositories = new LinkedList<Repository>();
+
+ files = new LinkedList<File>();
+ filesDist = new LinkedList<File>();
+ }
+
+
+ /**
+ * fetch all artifacts
+ * @return
+ * @throws MalformedURLException
+ * @throws ArtifactResolutionException
+ * @throws DependencyResolutionException
+ */
+ public List<File> fetch() throws MalformedURLException,
+ DependencyResolutionException, ArtifactResolutionException {
+
+ for (Dependency dep : dependencies) {
+ if (!dep.isLocalFsArtifact()) {
+ List<ArtifactResult> artifacts = fetchArtifactWithDep(dep);
+ for (ArtifactResult artifact : artifacts) {
+ if (dep.isDist()) {
+ filesDist.add(artifact.getArtifact().getFile());
+ }
+ files.add(artifact.getArtifact().getFile());
+ }
+ } else {
+ if (dep.isDist()) {
+ filesDist.add(new File(dep.getGroupArtifactVersion()));
+ }
+ files.add(new File(dep.getGroupArtifactVersion()));
+ }
+ }
+
+ return files;
+ }
+
+ private List<ArtifactResult> fetchArtifactWithDep(Dependency dep)
+ throws DependencyResolutionException, ArtifactResolutionException {
+ Artifact artifact = new DefaultArtifact(dep.getGroupArtifactVersion());
+
+ DependencyFilter classpathFlter = DependencyFilterUtils
+ .classpathFilter(JavaScopes.COMPILE);
+ PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter(
+ dep.getExclusions());
+
+ CollectRequest collectRequest = new CollectRequest();
+ collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact,
+ JavaScopes.COMPILE));
+
+ collectRequest.addRepository(mavenCentral);
+ collectRequest.addRepository(mavenLocal);
+ for (Repository repo : repositories) {
+ RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl());
+ rr.setPolicy(repo.isSnapshot(), null);
+ collectRequest.addRepository(rr);
+ }
+
+ DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
+ DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
+
+ return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
+ }
+
+ public List<File> getFiles() {
+ return files;
+ }
+
+ public List<File> getFilesDist() {
+ return filesDist;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java
new file mode 100644
index 0000000..cbe88bc
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.artifact.Artifact;
+import org.sonatype.aether.collection.CollectRequest;
+import org.sonatype.aether.graph.Dependency;
+import org.sonatype.aether.graph.DependencyFilter;
+import org.sonatype.aether.repository.RemoteRepository;
+import org.sonatype.aether.resolution.ArtifactResult;
+import org.sonatype.aether.resolution.DependencyRequest;
+import org.sonatype.aether.util.artifact.DefaultArtifact;
+import org.sonatype.aether.util.artifact.JavaScopes;
+import org.sonatype.aether.util.filter.DependencyFilterUtils;
+import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
+
+
+/**
+ * Deps resolver.
+ * Add new dependencies from mvn repo (at runtime) to Zeppelin.
+ */
+public class DependencyResolver extends AbstractDependencyResolver {
+ Logger logger = LoggerFactory.getLogger(DependencyResolver.class);
+
+ private final String[] exclusions = new String[] {"org.apache.zeppelin:zeppelin-zengine",
+ "org.apache.zeppelin:zeppelin-interpreter",
+ "org.apache.zeppelin:zeppelin-server"};
+
+ public DependencyResolver(String localRepoPath) {
+ super(localRepoPath);
+ }
+
+ public List<File> load(String artifact) throws Exception {
+ return load(artifact, new LinkedList<String>());
+ }
+
+ public List<File> load(String artifact, String destPath) throws Exception {
+ return load(artifact, new LinkedList<String>(), destPath);
+ }
+
+ public synchronized List<File> load(String artifact, Collection<String> excludes)
+ throws Exception {
+ if (StringUtils.isBlank(artifact)) {
+ // Should throw here
+ throw new RuntimeException("Invalid artifact to load");
+ }
+
+ // <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
+ int numSplits = artifact.split(":").length;
+ if (numSplits >= 3 && numSplits <= 6) {
+ return loadFromMvn(artifact, excludes);
+ } else {
+ LinkedList<File> libs = new LinkedList<File>();
+ libs.add(new File(artifact));
+ return libs;
+ }
+ }
+
+ public List<File> load(String artifact, Collection<String> excludes, String destPath)
+ throws Exception {
+ List<File> libs = load(artifact, excludes);
+
+ // find home dir
+ String home = System.getenv("ZEPPELIN_HOME");
+ if (home == null) {
+ home = System.getProperty("zeppelin.home");
+ }
+ if (home == null) {
+ home = "..";
+ }
+
+ for (File srcFile: libs) {
+ File destFile = new File(home + "/" + destPath, srcFile.getName());
+ if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
+ FileUtils.copyFile(srcFile, destFile);
+ logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
+ }
+ }
+ return libs;
+ }
+
+ private List<File> loadFromMvn(String artifact, Collection<String> excludes) throws Exception {
+ Collection<String> allExclusions = new LinkedList<String>();
+ allExclusions.addAll(excludes);
+ allExclusions.addAll(Arrays.asList(exclusions));
+
+ List<ArtifactResult> listOfArtifact;
+ listOfArtifact = getArtifactsWithDep(artifact, allExclusions);
+
+ Iterator<ArtifactResult> it = listOfArtifact.iterator();
+ while (it.hasNext()) {
+ Artifact a = it.next().getArtifact();
+ String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion();
+ for (String exclude : allExclusions) {
+ if (gav.startsWith(exclude)) {
+ it.remove();
+ break;
+ }
+ }
+ }
+
+ List<File> files = new LinkedList<File>();
+ for (ArtifactResult artifactResult : listOfArtifact) {
+ files.add(artifactResult.getArtifact().getFile());
+ logger.info("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
+ }
+
+ return files;
+ }
+
+ /**
+ * @param dependency
+ * @param excludes list of pattern can either be of the form groupId:artifactId
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public List<ArtifactResult> getArtifactsWithDep(String dependency,
+ Collection<String> excludes) throws Exception {
+ Artifact artifact = new DefaultArtifact(dependency);
+ DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE);
+ PatternExclusionsDependencyFilter exclusionFilter =
+ new PatternExclusionsDependencyFilter(excludes);
+
+ CollectRequest collectRequest = new CollectRequest();
+ collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE));
+
+ synchronized (repos) {
+ for (RemoteRepository repo : repos) {
+ collectRequest.addRepository(repo);
+ }
+ }
+ DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
+ DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter));
+ return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Repository.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Repository.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Repository.java
new file mode 100644
index 0000000..4c2d867
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Repository.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+import org.sonatype.aether.repository.Authentication;
+/**
+ *
+ *
+ */
+public class Repository {
+ private boolean snapshot = false;
+ private String name;
+ private String url;
+ private String username = null;
+ private String password = null;
+
+ public Repository(String name){
+ this.name = name;
+ }
+
+ public Repository url(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public Repository snapshot() {
+ snapshot = true;
+ return this;
+ }
+
+ public boolean isSnapshot() {
+ return snapshot;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public Repository username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public Repository password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public Repository credentials(String username, String password) {
+ this.username = username;
+ this.password = password;
+ return this;
+ }
+
+ public Authentication getAuthentication() {
+ Authentication auth = null;
+ if (this.username != null && this.password != null) {
+ auth = new Authentication(this.username, this.password);
+ }
+ return auth;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositoryListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositoryListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositoryListener.java
new file mode 100644
index 0000000..9f62d5f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositoryListener.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.AbstractRepositoryListener;
+import org.sonatype.aether.RepositoryEvent;
+
+/**
+ * Simple listener that print log.
+ */
+public class RepositoryListener extends AbstractRepositoryListener {
+ Logger logger = LoggerFactory.getLogger(RepositoryListener.class);
+
+ public RepositoryListener() {}
+
+ @Override
+ public void artifactDeployed(RepositoryEvent event) {
+ logger.info("Deployed " + event.getArtifact() + " to " + event.getRepository());
+ }
+
+ @Override
+ public void artifactDeploying(RepositoryEvent event) {
+ logger.info("Deploying " + event.getArtifact() + " to " + event.getRepository());
+ }
+
+ @Override
+ public void artifactDescriptorInvalid(RepositoryEvent event) {
+ logger.info("Invalid artifact descriptor for " + event.getArtifact() + ": "
+ + event.getException().getMessage());
+ }
+
+ @Override
+ public void artifactDescriptorMissing(RepositoryEvent event) {
+ logger.info("Missing artifact descriptor for " + event.getArtifact());
+ }
+
+ @Override
+ public void artifactInstalled(RepositoryEvent event) {
+ logger.info("Installed " + event.getArtifact() + " to " + event.getFile());
+ }
+
+ @Override
+ public void artifactInstalling(RepositoryEvent event) {
+ logger.info("Installing " + event.getArtifact() + " to " + event.getFile());
+ }
+
+ @Override
+ public void artifactResolved(RepositoryEvent event) {
+ logger.info("Resolved artifact " + event.getArtifact() + " from " + event.getRepository());
+ }
+
+ @Override
+ public void artifactDownloading(RepositoryEvent event) {
+ logger.info("Downloading artifact " + event.getArtifact() + " from " + event.getRepository());
+ }
+
+ @Override
+ public void artifactDownloaded(RepositoryEvent event) {
+ logger.info("Downloaded artifact " + event.getArtifact() + " from " + event.getRepository());
+ }
+
+ @Override
+ public void artifactResolving(RepositoryEvent event) {
+ logger.info("Resolving artifact " + event.getArtifact());
+ }
+
+ @Override
+ public void metadataDeployed(RepositoryEvent event) {
+ logger.info("Deployed " + event.getMetadata() + " to " + event.getRepository());
+ }
+
+ @Override
+ public void metadataDeploying(RepositoryEvent event) {
+ logger.info("Deploying " + event.getMetadata() + " to " + event.getRepository());
+ }
+
+ @Override
+ public void metadataInstalled(RepositoryEvent event) {
+ logger.info("Installed " + event.getMetadata() + " to " + event.getFile());
+ }
+
+ @Override
+ public void metadataInstalling(RepositoryEvent event) {
+ logger.info("Installing " + event.getMetadata() + " to " + event.getFile());
+ }
+
+ @Override
+ public void metadataInvalid(RepositoryEvent event) {
+ logger.info("Invalid metadata " + event.getMetadata());
+ }
+
+ @Override
+ public void metadataResolved(RepositoryEvent event) {
+ logger.info("Resolved metadata " + event.getMetadata() + " from " + event.getRepository());
+ }
+
+ @Override
+ public void metadataResolving(RepositoryEvent event) {
+ logger.info("Resolving metadata " + event.getMetadata() + " from " + event.getRepository());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositorySystemFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositorySystemFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositorySystemFactory.java
new file mode 100644
index 0000000..a224603
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositorySystemFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import org.apache.maven.repository.internal.DefaultServiceLocator;
+import org.apache.maven.wagon.Wagon;
+import org.apache.maven.wagon.providers.http.HttpWagon;
+import org.apache.maven.wagon.providers.http.LightweightHttpWagon;
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.connector.file.FileRepositoryConnectorFactory;
+import org.sonatype.aether.connector.wagon.WagonProvider;
+import org.sonatype.aether.connector.wagon.WagonRepositoryConnectorFactory;
+import org.sonatype.aether.spi.connector.RepositoryConnectorFactory;
+
+/**
+ * Get maven repository instance.
+ */
+public class RepositorySystemFactory {
+ public static RepositorySystem newRepositorySystem() {
+ DefaultServiceLocator locator = new DefaultServiceLocator();
+ locator.addService(RepositoryConnectorFactory.class, FileRepositoryConnectorFactory.class);
+ locator.addService(RepositoryConnectorFactory.class, WagonRepositoryConnectorFactory.class);
+ locator.setServices(WagonProvider.class, new ManualWagonProvider());
+
+ return locator.getService(RepositorySystem.class);
+ }
+
+ /**
+ * ManualWagonProvider
+ */
+ public static class ManualWagonProvider implements WagonProvider {
+
+ @Override
+ public Wagon lookup(String roleHint) throws Exception {
+ if ("http".equals(roleHint)) {
+ return new LightweightHttpWagon();
+ }
+
+ if ("https".equals(roleHint)) {
+ return new HttpWagon();
+ }
+
+ return null;
+ }
+
+ @Override
+ public void release(Wagon arg0) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/TransferListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/TransferListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/TransferListener.java
new file mode 100644
index 0000000..277a303
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/TransferListener.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.transfer.AbstractTransferListener;
+import org.sonatype.aether.transfer.TransferEvent;
+import org.sonatype.aether.transfer.TransferResource;
+
+/**
+ * Simple listener that show deps downloading progress.
+ */
+public class TransferListener extends AbstractTransferListener {
+ Logger logger = LoggerFactory.getLogger(TransferListener.class);
+ private PrintStream out;
+
+ private Map<TransferResource, Long> downloads = new ConcurrentHashMap<TransferResource, Long>();
+
+ private int lastLength;
+
+ public TransferListener() {}
+
+ @Override
+ public void transferInitiated(TransferEvent event) {
+ String message =
+ event.getRequestType() == TransferEvent.RequestType.PUT ? "Uploading" : "Downloading";
+
+ logger.info(message + ": " + event.getResource().getRepositoryUrl()
+ + event.getResource().getResourceName());
+ }
+
+ @Override
+ public void transferProgressed(TransferEvent event) {
+ TransferResource resource = event.getResource();
+ downloads.put(resource, Long.valueOf(event.getTransferredBytes()));
+
+ StringBuilder buffer = new StringBuilder(64);
+
+ for (Map.Entry<TransferResource, Long> entry : downloads.entrySet()) {
+ long total = entry.getKey().getContentLength();
+ long complete = entry.getValue().longValue();
+
+ buffer.append(getStatus(complete, total)).append(" ");
+ }
+
+ int pad = lastLength - buffer.length();
+ lastLength = buffer.length();
+ pad(buffer, pad);
+ buffer.append('\r');
+
+ logger.info(buffer.toString());
+ }
+
+ private String getStatus(long complete, long total) {
+ if (total >= 1024) {
+ return toKB(complete) + "/" + toKB(total) + " KB ";
+ } else if (total >= 0) {
+ return complete + "/" + total + " B ";
+ } else if (complete >= 1024) {
+ return toKB(complete) + " KB ";
+ } else {
+ return complete + " B ";
+ }
+ }
+
+ private void pad(StringBuilder buffer, int spaces) {
+ String block = " ";
+ while (spaces > 0) {
+ int n = Math.min(spaces, block.length());
+ buffer.append(block, 0, n);
+ spaces -= n;
+ }
+ }
+
+ @Override
+ public void transferSucceeded(TransferEvent event) {
+ transferCompleted(event);
+
+ TransferResource resource = event.getResource();
+ long contentLength = event.getTransferredBytes();
+ if (contentLength >= 0) {
+ String type =
+ (event.getRequestType() == TransferEvent.RequestType.PUT ? "Uploaded" : "Downloaded");
+ String len = contentLength >= 1024 ? toKB(contentLength) + " KB" : contentLength + " B";
+
+ String throughput = "";
+ long duration = System.currentTimeMillis() - resource.getTransferStartTime();
+ if (duration > 0) {
+ DecimalFormat format = new DecimalFormat("0.0", new DecimalFormatSymbols(Locale.ENGLISH));
+ double kbPerSec = (contentLength / 1024.0) / (duration / 1000.0);
+ throughput = " at " + format.format(kbPerSec) + " KB/sec";
+ }
+
+ logger.info(type + ": " + resource.getRepositoryUrl() + resource.getResourceName() + " ("
+ + len + throughput + ")");
+ }
+ }
+
+ @Override
+ public void transferFailed(TransferEvent event) {
+ transferCompleted(event);
+ event.getException().printStackTrace(out);
+ }
+
+ private void transferCompleted(TransferEvent event) {
+ downloads.remove(event.getResource());
+ StringBuilder buffer = new StringBuilder(64);
+ pad(buffer, lastLength);
+ buffer.append('\r');
+ logger.info(buffer.toString());
+ }
+
+ @Override
+ public void transferCorrupted(TransferEvent event) {
+ event.getException().printStackTrace(out);
+ }
+
+ protected long toKB(long bytes) {
+ return (bytes + 1023) / 1024;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java
new file mode 100644
index 0000000..33b7e54
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.dep;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class DependencyResolverTest {
+ private static DependencyResolver resolver;
+ private static String testPath;
+ private static String testCopyPath;
+ private static String home;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ testPath = "test-repo";
+ testCopyPath = "test-copy-repo";
+ resolver = new DependencyResolver(testPath);
+ home = System.getenv("ZEPPELIN_HOME");
+ if (home == null) {
+ home = System.getProperty("zeppelin.home");
+ }
+ if (home == null) {
+ home = "..";
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ FileUtils.deleteDirectory(new File(home + "/" + testPath));
+ FileUtils.deleteDirectory(new File(home + "/" + testCopyPath));
+ }
+
+ @Test
+ public void testLoad() throws Exception {
+ resolver.load("org.apache.commons:commons-lang3:3.4", testCopyPath);
+
+ assertTrue(new File(home + "/" + testPath + "/org/apache/commons/commons-lang3/3.4/").exists());
+ assertTrue(new File(home + "/" + testCopyPath + "/commons-lang3-3.4.jar").exists());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 7ad2b71..9e7a97c 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -30,6 +30,7 @@ import javax.ws.rs.core.Application;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
@@ -73,12 +74,14 @@ public class ZeppelinServer extends Application {
private InterpreterFactory replFactory;
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
+ private DependencyResolver depResolver;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+ this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
this.schedulerFactory = new SchedulerFactory();
- this.replFactory = new InterpreterFactory(conf, notebookWsServer);
+ this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index ca63eef..edcf513 100755
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -428,6 +428,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
// Decide when new note is created, interpreter settings will be binded automatically or not.
ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true),
ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"),
+ ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"),
// Allows a way to specify a ',' separated list of allowed origins for rest and websockets
// i.e. http://localhost:8080
ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"),
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index fc8cc04..4ff0cc3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.NullArgumentException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
@@ -65,19 +66,24 @@ public class InterpreterFactory {
AngularObjectRegistryListener angularObjectRegistryListener;
+ DependencyResolver depResolver;
+
public InterpreterFactory(ZeppelinConfiguration conf,
- AngularObjectRegistryListener angularObjectRegistryListener)
+ AngularObjectRegistryListener angularObjectRegistryListener,
+ DependencyResolver depResolver)
throws InterpreterException, IOException {
- this(conf, new InterpreterOption(true), angularObjectRegistryListener);
+ this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver);
}
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
- AngularObjectRegistryListener angularObjectRegistryListener)
+ AngularObjectRegistryListener angularObjectRegistryListener,
+ DependencyResolver depResolver)
throws InterpreterException, IOException {
this.conf = conf;
this.defaultOption = defaultOption;
this.angularObjectRegistryListener = angularObjectRegistryListener;
+ this.depResolver = depResolver;
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
interpreterClassList = replsConf.split(",");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 6a69b83..abd0e3b 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -38,32 +38,32 @@ import org.junit.Test;
public class InterpreterFactoryTest {
- private InterpreterFactory factory;
+ private InterpreterFactory factory;
private File tmpDir;
private ZeppelinConfiguration conf;
private InterpreterContext context;
@Before
- public void setUp() throws Exception {
+ public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
new File(tmpDir, "conf").mkdirs();
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
- MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
+ MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
- System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- conf = new ZeppelinConfiguration();
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
- context = new InterpreterContext("note", "id", "title", "text", null, null, null, null);
+ System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
+ System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
+ conf = new ZeppelinConfiguration();
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+ context = new InterpreterContext("note", "id", "title", "text", null, null, null, null);
- }
+ }
- @After
- public void tearDown() throws Exception {
- delete(tmpDir);
- }
+ @After
+ public void tearDown() throws Exception {
+ delete(tmpDir);
+ }
private void delete(File file){
if(file.isFile()) file.delete();
@@ -78,24 +78,24 @@ public class InterpreterFactoryTest {
}
}
- @Test
- public void testBasic() {
- List<String> all = factory.getDefaultInterpreterSettingList();
+ @Test
+ public void testBasic() {
+ List<String> all = factory.getDefaultInterpreterSettingList();
- // get interpreter
- Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
- assertFalse(((LazyOpenInterpreter) repl1).isOpen());
- repl1.interpret("repl1", context);
- assertTrue(((LazyOpenInterpreter) repl1).isOpen());
+ // get interpreter
+ Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
+ assertFalse(((LazyOpenInterpreter) repl1).isOpen());
+ repl1.interpret("repl1", context);
+ assertTrue(((LazyOpenInterpreter) repl1).isOpen());
- // try to get unavailable interpreter
- assertNull(factory.get("unknown"));
+ // try to get unavailable interpreter
+ assertNull(factory.get("unknown"));
- // restart interpreter
- factory.restart(all.get(0));
- repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
- assertFalse(((LazyOpenInterpreter) repl1).isOpen());
- }
+ // restart interpreter
+ factory.restart(all.get(0));
+ repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
+ assertFalse(((LazyOpenInterpreter) repl1).isOpen());
+ }
@Test
public void testFactoryDefaultList() throws IOException {
@@ -119,8 +119,8 @@ public class InterpreterFactoryTest {
try {
factory.add("a mock", "mock2", null, new Properties());
} catch(NullArgumentException e) {
- assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
- }
+ assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
+ }
try {
factory.add("a mock" , "mock2" , new InterpreterOption(false),null);
} catch (NullArgumentException e){
@@ -140,7 +140,7 @@ public class InterpreterFactoryTest {
factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
assertEquals(3, factory.get().size());
- InterpreterFactory factory2 = new InterpreterFactory(conf, null);
+ InterpreterFactory factory2 = new InterpreterFactory(conf, null, null);
assertEquals(3, factory2.get().size());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
index 2e1f5e3..a0455eb 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
@@ -58,7 +58,7 @@ public class NoteInterpreterLoaderTest {
MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11");
MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 917ea6b..34f7a1b 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -85,7 +85,7 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@@ -172,7 +172,7 @@ public class NotebookTest implements JobListenerFactory{
note.persist();
Notebook notebook2 = new Notebook(
- conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this, null);
+ conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null);
assertEquals(1, notebook2.getAllNotes().size());
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
index 4e9e180..60b3ba3 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
@@ -87,7 +87,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
SearchService search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
index 65be61b..cff086d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
@@ -76,7 +76,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
this.schedulerFactory = new SchedulerFactory();
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
[2/2] incubator-zeppelin git commit: ZEPPELIN-546 Enables interpreter
library loading from maven repository
Posted by mo...@apache.org.
ZEPPELIN-546 Enables interpreter library loading from maven repository
### What is this PR for?
This PR enables library loading from maven repository to load interpreter binaries.
To leverage current spark interpreter's library loading, moved org.apache.zeppelin.spark.dep(under spark) to org.apache.zeppelin.dep(under zeppelin-interpreter).
Making REST API and loading interpreter property on runtime will be the next step to complete [ZEPPELIN-546](https://issues.apache.org/jira/browse/ZEPPELIN-546?jql=project%20%3D%20ZEPPELIN) and will be handled in different PR.
### What type of PR is it?
Feature
### Is there a relevant Jira issue?
[ZEPPELIN-546](https://issues.apache.org/jira/browse/ZEPPELIN-546?jql=project%20%3D%20ZEPPELIN)
### Questions:
* Does the licenses files need update? No (dependencies added to `zeppelin-interpreter/pom.xml` is already used in other module)
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes (will be addressed in another PR)
Author: Mina Lee <mi...@nflabs.com>
Closes #590 from minahlee/ZEPPELIN-546 and squashes the following commits:
cd190d4 [Mina Lee] [ZEPPELIN-546] Refactoring
72ad12c [Mina Lee] [ZEPPELIN-546] Add test
039f5fa [Mina Lee] [ZEPPELIN-546] Fix tests
4baf271 [Mina Lee] [ZEPPELIN-546] Pass DependencyResolver to InterpreterFactory
aab2d04 [Mina Lee] [ZEPPELIN-546] Load interpreter from maven repository - Move org.apache.zeppelin.spark.dep package from zeppelin-spark to zeppelin-interpreter - Rename DependencyResolver/DependencyContext to SparkDependencyResolver/SparkDependencyContext - Add general DependencyResolver
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/bc715511
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/bc715511
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/bc715511
Branch: refs/heads/master
Commit: bc715511404985b60c4cd302be305bf8fd917bf7
Parents: 0c42f43
Author: Mina Lee <mi...@nflabs.com>
Authored: Tue Jan 12 16:12:38 2016 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Jan 12 19:17:50 2016 -0800
----------------------------------------------------------------------
.../apache/zeppelin/spark/DepInterpreter.java | 36 +-
.../zeppelin/spark/PySparkInterpreter.java | 4 +-
.../apache/zeppelin/spark/SparkInterpreter.java | 14 +-
.../apache/zeppelin/spark/ZeppelinContext.java | 6 +-
.../org/apache/zeppelin/spark/dep/Booter.java | 72 ----
.../apache/zeppelin/spark/dep/Dependency.java | 90 -----
.../zeppelin/spark/dep/DependencyContext.java | 179 ---------
.../zeppelin/spark/dep/DependencyResolver.java | 385 -------------------
.../apache/zeppelin/spark/dep/Repository.java | 80 ----
.../zeppelin/spark/dep/RepositoryListener.java | 118 ------
.../spark/dep/RepositorySystemFactory.java | 66 ----
.../spark/dep/SparkDependencyContext.java | 181 +++++++++
.../spark/dep/SparkDependencyResolver.java | 351 +++++++++++++++++
.../zeppelin/spark/dep/TransferListener.java | 145 -------
.../spark/dep/DependencyResolverTest.java | 52 ---
.../spark/dep/SparkDependencyResolverTest.java | 52 +++
zeppelin-interpreter/pom.xml | 117 ++++++
.../dep/AbstractDependencyResolver.java | 70 ++++
.../java/org/apache/zeppelin/dep/Booter.java | 72 ++++
.../org/apache/zeppelin/dep/Dependency.java | 90 +++++
.../apache/zeppelin/dep/DependencyContext.java | 148 +++++++
.../apache/zeppelin/dep/DependencyResolver.java | 163 ++++++++
.../org/apache/zeppelin/dep/Repository.java | 80 ++++
.../apache/zeppelin/dep/RepositoryListener.java | 118 ++++++
.../zeppelin/dep/RepositorySystemFactory.java | 66 ++++
.../apache/zeppelin/dep/TransferListener.java | 145 +++++++
.../zeppelin/dep/DependencyResolverTest.java | 62 +++
.../apache/zeppelin/server/ZeppelinServer.java | 5 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 1 +
.../interpreter/InterpreterFactory.java | 12 +-
.../interpreter/InterpreterFactoryTest.java | 62 +--
.../notebook/NoteInterpreterLoaderTest.java | 2 +-
.../apache/zeppelin/notebook/NotebookTest.java | 4 +-
.../notebook/repo/NotebookRepoSyncTest.java | 2 +-
.../notebook/repo/VFSNotebookRepoTest.java | 2 +-
35 files changed, 1806 insertions(+), 1246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
index 7a17aa0..a4fdae3 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
@@ -40,7 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.spark.dep.DependencyContext;
+import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.resolution.ArtifactResolutionException;
@@ -69,7 +69,9 @@ public class DepInterpreter extends Interpreter {
"spark",
DepInterpreter.class.getName(),
new InterpreterPropertyBuilder()
- .add("zeppelin.dep.localrepo", "local-repo", "local repository for dependency loader")
+ .add("zeppelin.dep.localrepo",
+ getSystemDefault("ZEPPELIN_DEP_LOCALREPO", null, "local-repo"),
+ "local repository for dependency loader")
.add("zeppelin.dep.additionalRemoteRepository",
"spark-packages,http://dl.bintray.com/spark-packages/maven,false;",
"A list of 'id,remote-repository-URL,is-snapshot;' for each remote repository.")
@@ -79,7 +81,7 @@ public class DepInterpreter extends Interpreter {
private SparkIMain intp;
private ByteArrayOutputStream out;
- private DependencyContext depc;
+ private SparkDependencyContext depc;
private SparkJLineCompletion completor;
private SparkILoop interpreter;
static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class);
@@ -88,10 +90,30 @@ public class DepInterpreter extends Interpreter {
super(property);
}
- public DependencyContext getDependencyContext() {
+ public SparkDependencyContext getDependencyContext() {
return depc;
}
+ public static String getSystemDefault(
+ String envName,
+ String propertyName,
+ String defaultValue) {
+
+ if (envName != null && !envName.isEmpty()) {
+ String envValue = System.getenv().get(envName);
+ if (envValue != null) {
+ return envValue;
+ }
+ }
+
+ if (propertyName != null && !propertyName.isEmpty()) {
+ String propValue = System.getProperty(propertyName);
+ if (propValue != null) {
+ return propValue;
+ }
+ }
+ return defaultValue;
+ }
@Override
public void close() {
@@ -152,16 +174,16 @@ public class DepInterpreter extends Interpreter {
intp.setContextClassLoader();
intp.initializeSynchronous();
- depc = new DependencyContext(getProperty("zeppelin.dep.localrepo"),
+ depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"),
getProperty("zeppelin.dep.additionalRemoteRepository"));
completor = new SparkJLineCompletion(intp);
-
intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
Map<String, Object> binder = (Map<String, Object>) getValue("_binder");
binder.put("depc", depc);
intp.interpret("@transient val z = "
- + "_binder.get(\"depc\").asInstanceOf[org.apache.zeppelin.spark.dep.DependencyContext]");
+ + "_binder.get(\"depc\")"
+ + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]");
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 0bfad6a..8c4ba87 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -54,7 +54,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.spark.dep.DependencyContext;
+import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,7 +127,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
URL [] urls = new URL[0];
if (depInterpreter != null) {
- DependencyContext depc = depInterpreter.getDependencyContext();
+ SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
List<File> files = depc.getFiles();
List<URL> urlList = new LinkedList<URL>();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 2bf7a6b..d975791 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -55,8 +55,8 @@ import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.spark.dep.DependencyContext;
-import org.apache.zeppelin.spark.dep.DependencyResolver;
+import org.apache.zeppelin.spark.dep.SparkDependencyContext;
+import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +117,7 @@ public class SparkInterpreter extends Interpreter {
private SparkContext sc;
private ByteArrayOutputStream out;
private SQLContext sqlc;
- private DependencyResolver dep;
+ private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
private JobProgressListener sparkListener;
@@ -222,9 +222,9 @@ public class SparkInterpreter extends Interpreter {
return sqlc;
}
- public DependencyResolver getDependencyResolver() {
+ public SparkDependencyResolver getDependencyResolver() {
if (dep == null) {
- dep = new DependencyResolver(intp,
+ dep = new SparkDependencyResolver(intp,
sc,
getProperty("zeppelin.dep.localrepo"),
getProperty("zeppelin.dep.additionalRemoteRepository"));
@@ -427,7 +427,7 @@ public class SparkInterpreter extends Interpreter {
// add dependency from DepInterpreter
DepInterpreter depInterpreter = getDepInterpreter();
if (depInterpreter != null) {
- DependencyContext depc = depInterpreter.getDependencyContext();
+ SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
List<File> files = depc.getFiles();
if (files != null) {
@@ -536,7 +536,7 @@ public class SparkInterpreter extends Interpreter {
// add jar
if (depInterpreter != null) {
- DependencyContext depc = depInterpreter.getDependencyContext();
+ SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
List<File> files = depc.getFilesDist();
if (files != null) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 5ec38d4..af806bf 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -43,7 +43,7 @@ import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.spark.dep.DependencyResolver;
+import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import scala.Tuple2;
import scala.Unit;
@@ -53,14 +53,14 @@ import scala.collection.Iterable;
* Spark context for zeppelin.
*/
public class ZeppelinContext extends HashMap<String, Object> {
- private DependencyResolver dep;
+ private SparkDependencyResolver dep;
private PrintStream out;
private InterpreterContext interpreterContext;
private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
- DependencyResolver dep, PrintStream printStream,
+ SparkDependencyResolver dep, PrintStream printStream,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/Booter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/Booter.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/Booter.java
deleted file mode 100644
index 1b7a6d6..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/Booter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import java.io.File;
-
-import org.apache.maven.repository.internal.MavenRepositorySystemSession;
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.RepositorySystemSession;
-import org.sonatype.aether.repository.LocalRepository;
-import org.sonatype.aether.repository.RemoteRepository;
-
-/**
- * Manage mvn repository.
- */
-public class Booter {
- public static RepositorySystem newRepositorySystem() {
- return RepositorySystemFactory.newRepositorySystem();
- }
-
- public static RepositorySystemSession newRepositorySystemSession(
- RepositorySystem system, String localRepoPath) {
- MavenRepositorySystemSession session = new MavenRepositorySystemSession();
-
- // find homedir
- String home = System.getenv("ZEPPELIN_HOME");
- if (home == null) {
- home = System.getProperty("zeppelin.home");
- }
- if (home == null) {
- home = "..";
- }
-
- String path = home + "/" + localRepoPath;
-
- LocalRepository localRepo =
- new LocalRepository(new File(path).getAbsolutePath());
- session.setLocalRepositoryManager(system.newLocalRepositoryManager(localRepo));
-
- // session.setTransferListener(new ConsoleTransferListener());
- // session.setRepositoryListener(new ConsoleRepositoryListener());
-
- // uncomment to generate dirty trees
- // session.setDependencyGraphTransformer( null );
-
- return session;
- }
-
- public static RemoteRepository newCentralRepository() {
- return new RemoteRepository("central", "default", "http://repo1.maven.org/maven2/");
- }
-
- public static RemoteRepository newLocalRepository() {
- return new RemoteRepository("local",
- "default", "file://" + System.getProperty("user.home") + "/.m2/repository");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/Dependency.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/Dependency.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/Dependency.java
deleted file mode 100644
index ca92893..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/Dependency.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- *
- */
-public class Dependency {
- private String groupArtifactVersion;
- private boolean local = false;
- private List<String> exclusions;
-
-
- public Dependency(String groupArtifactVersion) {
- this.groupArtifactVersion = groupArtifactVersion;
- exclusions = new LinkedList<String>();
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Dependency)) {
- return false;
- } else {
- return ((Dependency) o).groupArtifactVersion.equals(groupArtifactVersion);
- }
- }
-
- /**
- * Don't add artifact into SparkContext (sc.addJar())
- * @return
- */
- public Dependency local() {
- local = true;
- return this;
- }
-
- public Dependency excludeAll() {
- exclude("*");
- return this;
- }
-
- /**
- *
- * @param exclusions comma or newline separated list of "groupId:ArtifactId"
- * @return
- */
- public Dependency exclude(String exclusions) {
- for (String item : exclusions.split(",|\n")) {
- this.exclusions.add(item);
- }
-
- return this;
- }
-
-
- public String getGroupArtifactVersion() {
- return groupArtifactVersion;
- }
-
- public boolean isDist() {
- return !local;
- }
-
- public List<String> getExclusions() {
- return exclusions;
- }
-
- public boolean isLocalFsArtifact() {
- int numSplits = groupArtifactVersion.split(":").length;
- return !(numSplits >= 3 && numSplits <= 6);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyContext.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyContext.java
deleted file mode 100644
index 834e518..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyContext.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.RepositorySystemSession;
-import org.sonatype.aether.artifact.Artifact;
-import org.sonatype.aether.collection.CollectRequest;
-import org.sonatype.aether.graph.DependencyFilter;
-import org.sonatype.aether.repository.RemoteRepository;
-import org.sonatype.aether.repository.Authentication;
-import org.sonatype.aether.resolution.ArtifactResolutionException;
-import org.sonatype.aether.resolution.ArtifactResult;
-import org.sonatype.aether.resolution.DependencyRequest;
-import org.sonatype.aether.resolution.DependencyResolutionException;
-import org.sonatype.aether.util.artifact.DefaultArtifact;
-import org.sonatype.aether.util.artifact.JavaScopes;
-import org.sonatype.aether.util.filter.DependencyFilterUtils;
-import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
-
-
-/**
- *
- */
-public class DependencyContext {
- List<Dependency> dependencies = new LinkedList<Dependency>();
- List<Repository> repositories = new LinkedList<Repository>();
-
- List<File> files = new LinkedList<File>();
- List<File> filesDist = new LinkedList<File>();
- private RepositorySystem system = Booter.newRepositorySystem();
- private RepositorySystemSession session;
- private RemoteRepository mavenCentral = Booter.newCentralRepository();
- private RemoteRepository mavenLocal = Booter.newLocalRepository();
- private List<RemoteRepository> additionalRepos = new LinkedList<RemoteRepository>();
-
- public DependencyContext(String localRepoPath, String additionalRemoteRepository) {
- session = Booter.newRepositorySystemSession(system, localRepoPath);
- addRepoFromProperty(additionalRemoteRepository);
- }
-
- public Dependency load(String lib) {
- Dependency dep = new Dependency(lib);
-
- if (dependencies.contains(dep)) {
- dependencies.remove(dep);
- }
- dependencies.add(dep);
- return dep;
- }
-
- public Repository addRepo(String name) {
- Repository rep = new Repository(name);
- repositories.add(rep);
- return rep;
- }
-
- public void reset() {
- dependencies = new LinkedList<Dependency>();
- repositories = new LinkedList<Repository>();
-
- files = new LinkedList<File>();
- filesDist = new LinkedList<File>();
- }
-
- private void addRepoFromProperty(String listOfRepo) {
- if (listOfRepo != null) {
- String[] repos = listOfRepo.split(";");
- for (String repo : repos) {
- String[] parts = repo.split(",");
- if (parts.length == 3) {
- String id = parts[0].trim();
- String url = parts[1].trim();
- boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
- if (id.length() > 1 && url.length() > 1) {
- RemoteRepository rr = new RemoteRepository(id, "default", url);
- rr.setPolicy(isSnapshot, null);
- additionalRepos.add(rr);
- }
- }
- }
- }
- }
-
- /**
- * fetch all artifacts
- * @return
- * @throws MalformedURLException
- * @throws ArtifactResolutionException
- * @throws DependencyResolutionException
- */
- public List<File> fetch() throws MalformedURLException,
- DependencyResolutionException, ArtifactResolutionException {
-
- for (Dependency dep : dependencies) {
- if (!dep.isLocalFsArtifact()) {
- List<ArtifactResult> artifacts = fetchArtifactWithDep(dep);
- for (ArtifactResult artifact : artifacts) {
- if (dep.isDist()) {
- filesDist.add(artifact.getArtifact().getFile());
- }
- files.add(artifact.getArtifact().getFile());
- }
- } else {
- if (dep.isDist()) {
- filesDist.add(new File(dep.getGroupArtifactVersion()));
- }
- files.add(new File(dep.getGroupArtifactVersion()));
- }
- }
-
- return files;
- }
-
- private List<ArtifactResult> fetchArtifactWithDep(Dependency dep)
- throws DependencyResolutionException, ArtifactResolutionException {
- Artifact artifact = new DefaultArtifact(
- DependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion()));
-
- DependencyFilter classpathFlter = DependencyFilterUtils
- .classpathFilter(JavaScopes.COMPILE);
- PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter(
- DependencyResolver.inferScalaVersion(dep.getExclusions()));
-
- CollectRequest collectRequest = new CollectRequest();
- collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact,
- JavaScopes.COMPILE));
-
- collectRequest.addRepository(mavenCentral);
- collectRequest.addRepository(mavenLocal);
- for (RemoteRepository repo : additionalRepos) {
- collectRequest.addRepository(repo);
- }
- for (Repository repo : repositories) {
- RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl());
- rr.setPolicy(repo.isSnapshot(), null);
- Authentication auth = repo.getAuthentication();
- if (auth != null) {
- rr.setAuthentication(auth);
- }
- collectRequest.addRepository(rr);
- }
-
- DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
- DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
-
- return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
- }
-
- public List<File> getFiles() {
- return files;
- }
-
- public List<File> getFilesDist() {
- return filesDist;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
deleted file mode 100644
index eed3924..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import java.io.File;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.SparkContext;
-import org.apache.spark.repl.SparkIMain;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.RepositorySystemSession;
-import org.sonatype.aether.artifact.Artifact;
-import org.sonatype.aether.collection.CollectRequest;
-import org.sonatype.aether.graph.Dependency;
-import org.sonatype.aether.graph.DependencyFilter;
-import org.sonatype.aether.repository.RemoteRepository;
-import org.sonatype.aether.resolution.ArtifactResult;
-import org.sonatype.aether.resolution.DependencyRequest;
-import org.sonatype.aether.util.artifact.DefaultArtifact;
-import org.sonatype.aether.util.artifact.JavaScopes;
-import org.sonatype.aether.util.filter.DependencyFilterUtils;
-import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
-
-import scala.Some;
-import scala.collection.IndexedSeq;
-import scala.reflect.io.AbstractFile;
-import scala.tools.nsc.Global;
-import scala.tools.nsc.backend.JavaPlatform;
-import scala.tools.nsc.util.ClassPath;
-import scala.tools.nsc.util.MergedClassPath;
-
-/**
- * Deps resolver.
- * Add new dependencies from mvn repo (at runetime) to Zeppelin.
- */
-public class DependencyResolver {
- Logger logger = LoggerFactory.getLogger(DependencyResolver.class);
- private Global global;
- private SparkIMain intp;
- private SparkContext sc;
- private RepositorySystem system = Booter.newRepositorySystem();
- private List<RemoteRepository> repos = new LinkedList<RemoteRepository>();
- private RepositorySystemSession session;
- private DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter(
- JavaScopes.COMPILE,
- JavaScopes.PROVIDED,
- JavaScopes.RUNTIME,
- JavaScopes.SYSTEM);
-
- private final String[] exclusions = new String[] {"org.scala-lang:scala-library",
- "org.scala-lang:scala-compiler",
- "org.scala-lang:scala-reflect",
- "org.scala-lang:scalap",
- "org.apache.zeppelin:zeppelin-zengine",
- "org.apache.zeppelin:zeppelin-spark",
- "org.apache.zeppelin:zeppelin-server"};
-
- public DependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath,
- String additionalRemoteRepository) {
- this.intp = intp;
- this.global = intp.global();
- this.sc = sc;
- session = Booter.newRepositorySystemSession(system, localRepoPath);
- repos.add(Booter.newCentralRepository()); // add maven central
- repos.add(Booter.newLocalRepository());
- addRepoFromProperty(additionalRemoteRepository);
- }
-
- public void addRepo(String id, String url, boolean snapshot) {
- synchronized (repos) {
- delRepo(id);
- RemoteRepository rr = new RemoteRepository(id, "default", url);
- rr.setPolicy(snapshot, null);
- repos.add(rr);
- }
- }
-
- public RemoteRepository delRepo(String id) {
- synchronized (repos) {
- Iterator<RemoteRepository> it = repos.iterator();
- if (it.hasNext()) {
- RemoteRepository repo = it.next();
- if (repo.getId().equals(id)) {
- it.remove();
- return repo;
- }
- }
- }
- return null;
- }
-
- private void addRepoFromProperty(String listOfRepo) {
- if (listOfRepo != null) {
- String[] repos = listOfRepo.split(";");
- for (String repo : repos) {
- String[] parts = repo.split(",");
- if (parts.length == 3) {
- String id = parts[0].trim();
- String url = parts[1].trim();
- boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
- if (id.length() > 1 && url.length() > 1) {
- addRepo(id, url, isSnapshot);
- }
- }
- }
- }
- }
-
- private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException,
- IllegalArgumentException, InvocationTargetException {
-
- JavaPlatform platform = (JavaPlatform) global.platform();
- MergedClassPath<AbstractFile> newClassPath = mergeUrlsIntoClassPath(platform, urls);
-
- Method[] methods = platform.getClass().getMethods();
- for (Method m : methods) {
- if (m.getName().endsWith("currentClassPath_$eq")) {
- m.invoke(platform, new Some(newClassPath));
- break;
- }
- }
-
- // NOTE: Must use reflection until this is exposed/fixed upstream in Scala
- List<String> classPaths = new LinkedList<String>();
- for (URL url : urls) {
- classPaths.add(url.getPath());
- }
-
- // Reload all jars specified into our compiler
- global.invalidateClassPathEntries(scala.collection.JavaConversions.asScalaBuffer(classPaths)
- .toList());
- }
-
- // Until spark 1.1.x
- // check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7
- private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException,
- IllegalAccessException, IllegalArgumentException,
- InvocationTargetException, NoSuchMethodException {
- ClassLoader cl = intp.classLoader().getParent();
- Method addURL;
- addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class});
- addURL.setAccessible(true);
- for (URL url : urls) {
- addURL.invoke(cl, url);
- }
- }
-
- private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException,
- IllegalAccessException, IllegalArgumentException,
- InvocationTargetException, NoSuchMethodException {
- ClassLoader cl = intp.classLoader().getParent();
- Method addURL;
- addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class});
- addURL.setAccessible(true);
- for (URL url : urls) {
- addURL.invoke(cl, url);
- }
- }
-
- private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) {
- IndexedSeq<ClassPath<AbstractFile>> entries =
- ((MergedClassPath<AbstractFile>) platform.classPath()).entries();
- List<ClassPath<AbstractFile>> cp = new LinkedList<ClassPath<AbstractFile>>();
-
- for (int i = 0; i < entries.size(); i++) {
- cp.add(entries.apply(i));
- }
-
- for (URL url : urls) {
- AbstractFile file;
- if ("file".equals(url.getProtocol())) {
- File f = new File(url.getPath());
- if (f.isDirectory()) {
- file = AbstractFile.getDirectory(scala.reflect.io.File.jfile2path(f));
- } else {
- file = AbstractFile.getFile(scala.reflect.io.File.jfile2path(f));
- }
- } else {
- file = AbstractFile.getURL(url);
- }
-
- ClassPath<AbstractFile> newcp = platform.classPath().context().newClassPath(file);
-
- // distinct
- if (cp.contains(newcp) == false) {
- cp.add(newcp);
- }
- }
-
- return new MergedClassPath(scala.collection.JavaConversions.asScalaBuffer(cp).toIndexedSeq(),
- platform.classPath().context());
- }
-
- public List<String> load(String artifact,
- boolean addSparkContext) throws Exception {
- return load(artifact, new LinkedList<String>(), addSparkContext);
- }
-
- public List<String> load(String artifact, Collection<String> excludes,
- boolean addSparkContext) throws Exception {
- if (StringUtils.isBlank(artifact)) {
- // Should throw here
- throw new RuntimeException("Invalid artifact to load");
- }
-
- // <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
- int numSplits = artifact.split(":").length;
- if (numSplits >= 3 && numSplits <= 6) {
- return loadFromMvn(artifact, excludes, addSparkContext);
- } else {
- loadFromFs(artifact, addSparkContext);
- LinkedList<String> libs = new LinkedList<String>();
- libs.add(artifact);
- return libs;
- }
- }
-
- private void loadFromFs(String artifact, boolean addSparkContext) throws Exception {
- File jarFile = new File(artifact);
-
- intp.global().new Run();
-
- if (sc.version().startsWith("1.1")) {
- updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()});
- } else {
- updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()});
- }
-
- if (addSparkContext) {
- sc.addJar(jarFile.getAbsolutePath());
- }
- }
-
- private List<String> loadFromMvn(String artifact, Collection<String> excludes,
- boolean addSparkContext) throws Exception {
- List<String> loadedLibs = new LinkedList<String>();
- Collection<String> allExclusions = new LinkedList<String>();
- allExclusions.addAll(excludes);
- allExclusions.addAll(Arrays.asList(exclusions));
-
- List<ArtifactResult> listOfArtifact;
- listOfArtifact = getArtifactsWithDep(artifact, allExclusions);
-
- Iterator<ArtifactResult> it = listOfArtifact.iterator();
- while (it.hasNext()) {
- Artifact a = it.next().getArtifact();
- String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion();
- for (String exclude : allExclusions) {
- if (gav.startsWith(exclude)) {
- it.remove();
- break;
- }
- }
- }
-
- List<URL> newClassPathList = new LinkedList<URL>();
- List<File> files = new LinkedList<File>();
- for (ArtifactResult artifactResult : listOfArtifact) {
- logger.info("Load " + artifactResult.getArtifact().getGroupId() + ":"
- + artifactResult.getArtifact().getArtifactId() + ":"
- + artifactResult.getArtifact().getVersion());
- newClassPathList.add(artifactResult.getArtifact().getFile().toURI().toURL());
- files.add(artifactResult.getArtifact().getFile());
- loadedLibs.add(artifactResult.getArtifact().getGroupId() + ":"
- + artifactResult.getArtifact().getArtifactId() + ":"
- + artifactResult.getArtifact().getVersion());
- }
-
- intp.global().new Run();
- if (sc.version().startsWith("1.1")) {
- updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0]));
- } else {
- updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0]));
- }
- updateCompilerClassPath(newClassPathList.toArray(new URL[0]));
-
- if (addSparkContext) {
- for (File f : files) {
- sc.addJar(f.getAbsolutePath());
- }
- }
-
- return loadedLibs;
- }
-
- /**
- *
- * @param dependency
- * @param excludes list of pattern can either be of the form groupId:artifactId
- * @return
- * @throws Exception
- */
- public List<ArtifactResult> getArtifactsWithDep(String dependency,
- Collection<String> excludes) throws Exception {
- Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency));
- DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter( JavaScopes.COMPILE );
- PatternExclusionsDependencyFilter exclusionFilter =
- new PatternExclusionsDependencyFilter(inferScalaVersion(excludes));
-
- CollectRequest collectRequest = new CollectRequest();
- collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE));
-
- synchronized (repos) {
- for (RemoteRepository repo : repos) {
- collectRequest.addRepository(repo);
- }
- }
- DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
- DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
- return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
- }
-
- public static Collection<String> inferScalaVersion(Collection<String> artifact) {
- List<String> list = new LinkedList<String>();
- for (String a : artifact) {
- list.add(inferScalaVersion(a));
- }
- return list;
- }
-
- public static String inferScalaVersion(String artifact) {
- int pos = artifact.indexOf(":");
- if (pos < 0 || pos + 2 >= artifact.length()) {
- // failed to infer
- return artifact;
- }
-
- if (':' == artifact.charAt(pos + 1)) {
- String restOfthem = "";
- String versionSep = ":";
-
- String groupId = artifact.substring(0, pos);
- int nextPos = artifact.indexOf(":", pos + 2);
- if (nextPos < 0) {
- if (artifact.charAt(artifact.length() - 1) == '*') {
- nextPos = artifact.length() - 1;
- versionSep = "";
- restOfthem = "*";
- } else {
- versionSep = "";
- nextPos = artifact.length();
- }
- }
-
- String artifactId = artifact.substring(pos + 2, nextPos);
- if (nextPos < artifact.length()) {
- if (!restOfthem.equals("*")) {
- restOfthem = artifact.substring(nextPos + 1);
- }
- }
-
- String [] version = scala.util.Properties.versionNumberString().split("[.]");
- String scalaVersion = version[0] + "." + version[1];
-
- return groupId + ":" + artifactId + "_" + scalaVersion + versionSep + restOfthem;
- } else {
- return artifact;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/Repository.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/Repository.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/Repository.java
deleted file mode 100644
index aee56b5..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/Repository.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-import org.sonatype.aether.repository.Authentication;
-/**
- *
- *
- */
-public class Repository {
- private boolean snapshot = false;
- private String name;
- private String url;
- private String username = null;
- private String password = null;
-
- public Repository(String name){
- this.name = name;
- }
-
- public Repository url(String url) {
- this.url = url;
- return this;
- }
-
- public Repository snapshot() {
- snapshot = true;
- return this;
- }
-
- public boolean isSnapshot() {
- return snapshot;
- }
-
- public String getName() {
- return name;
- }
-
- public String getUrl() {
- return url;
- }
-
- public Repository username(String username) {
- this.username = username;
- return this;
- }
-
- public Repository password(String password) {
- this.password = password;
- return this;
- }
-
- public Repository credentials(String username, String password) {
- this.username = username;
- this.password = password;
- return this;
- }
-
- protected Authentication getAuthentication() {
- Authentication auth = null;
- if (this.username != null && this.password != null) {
- auth = new Authentication(this.username, this.password);
- }
- return auth;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositoryListener.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositoryListener.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositoryListener.java
deleted file mode 100644
index 86b3334..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositoryListener.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.AbstractRepositoryListener;
-import org.sonatype.aether.RepositoryEvent;
-
-/**
- * Simple listener that print log.
- */
-public class RepositoryListener extends AbstractRepositoryListener {
- Logger logger = LoggerFactory.getLogger(RepositoryListener.class);
-
- public RepositoryListener() {}
-
- @Override
- public void artifactDeployed(RepositoryEvent event) {
- logger.info("Deployed " + event.getArtifact() + " to " + event.getRepository());
- }
-
- @Override
- public void artifactDeploying(RepositoryEvent event) {
- logger.info("Deploying " + event.getArtifact() + " to " + event.getRepository());
- }
-
- @Override
- public void artifactDescriptorInvalid(RepositoryEvent event) {
- logger.info("Invalid artifact descriptor for " + event.getArtifact() + ": "
- + event.getException().getMessage());
- }
-
- @Override
- public void artifactDescriptorMissing(RepositoryEvent event) {
- logger.info("Missing artifact descriptor for " + event.getArtifact());
- }
-
- @Override
- public void artifactInstalled(RepositoryEvent event) {
- logger.info("Installed " + event.getArtifact() + " to " + event.getFile());
- }
-
- @Override
- public void artifactInstalling(RepositoryEvent event) {
- logger.info("Installing " + event.getArtifact() + " to " + event.getFile());
- }
-
- @Override
- public void artifactResolved(RepositoryEvent event) {
- logger.info("Resolved artifact " + event.getArtifact() + " from " + event.getRepository());
- }
-
- @Override
- public void artifactDownloading(RepositoryEvent event) {
- logger.info("Downloading artifact " + event.getArtifact() + " from " + event.getRepository());
- }
-
- @Override
- public void artifactDownloaded(RepositoryEvent event) {
- logger.info("Downloaded artifact " + event.getArtifact() + " from " + event.getRepository());
- }
-
- @Override
- public void artifactResolving(RepositoryEvent event) {
- logger.info("Resolving artifact " + event.getArtifact());
- }
-
- @Override
- public void metadataDeployed(RepositoryEvent event) {
- logger.info("Deployed " + event.getMetadata() + " to " + event.getRepository());
- }
-
- @Override
- public void metadataDeploying(RepositoryEvent event) {
- logger.info("Deploying " + event.getMetadata() + " to " + event.getRepository());
- }
-
- @Override
- public void metadataInstalled(RepositoryEvent event) {
- logger.info("Installed " + event.getMetadata() + " to " + event.getFile());
- }
-
- @Override
- public void metadataInstalling(RepositoryEvent event) {
- logger.info("Installing " + event.getMetadata() + " to " + event.getFile());
- }
-
- @Override
- public void metadataInvalid(RepositoryEvent event) {
- logger.info("Invalid metadata " + event.getMetadata());
- }
-
- @Override
- public void metadataResolved(RepositoryEvent event) {
- logger.info("Resolved metadata " + event.getMetadata() + " from " + event.getRepository());
- }
-
- @Override
- public void metadataResolving(RepositoryEvent event) {
- logger.info("Resolving metadata " + event.getMetadata() + " from " + event.getRepository());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositorySystemFactory.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositorySystemFactory.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositorySystemFactory.java
deleted file mode 100644
index cc0740d..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositorySystemFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import org.apache.maven.repository.internal.DefaultServiceLocator;
-import org.apache.maven.wagon.Wagon;
-import org.apache.maven.wagon.providers.http.HttpWagon;
-import org.apache.maven.wagon.providers.http.LightweightHttpWagon;
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.connector.file.FileRepositoryConnectorFactory;
-import org.sonatype.aether.connector.wagon.WagonProvider;
-import org.sonatype.aether.connector.wagon.WagonRepositoryConnectorFactory;
-import org.sonatype.aether.spi.connector.RepositoryConnectorFactory;
-
-/**
- * Get maven repository instance.
- */
-public class RepositorySystemFactory {
- public static RepositorySystem newRepositorySystem() {
- DefaultServiceLocator locator = new DefaultServiceLocator();
- locator.addService(RepositoryConnectorFactory.class, FileRepositoryConnectorFactory.class);
- locator.addService(RepositoryConnectorFactory.class, WagonRepositoryConnectorFactory.class);
- locator.setServices(WagonProvider.class, new ManualWagonProvider());
-
- return locator.getService(RepositorySystem.class);
- }
-
- /**
- * ManualWagonProvider
- */
- public static class ManualWagonProvider implements WagonProvider {
-
- @Override
- public Wagon lookup(String roleHint) throws Exception {
- if ("http".equals(roleHint)) {
- return new LightweightHttpWagon();
- }
-
- if ("https".equals(roleHint)) {
- return new HttpWagon();
- }
-
- return null;
- }
-
- @Override
- public void release(Wagon arg0) {
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java
new file mode 100644
index 0000000..1b20b0f
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark.dep;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.zeppelin.dep.Booter;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.Repository;
+
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.RepositorySystemSession;
+import org.sonatype.aether.artifact.Artifact;
+import org.sonatype.aether.collection.CollectRequest;
+import org.sonatype.aether.graph.DependencyFilter;
+import org.sonatype.aether.repository.RemoteRepository;
+import org.sonatype.aether.repository.Authentication;
+import org.sonatype.aether.resolution.ArtifactResolutionException;
+import org.sonatype.aether.resolution.ArtifactResult;
+import org.sonatype.aether.resolution.DependencyRequest;
+import org.sonatype.aether.resolution.DependencyResolutionException;
+import org.sonatype.aether.util.artifact.DefaultArtifact;
+import org.sonatype.aether.util.artifact.JavaScopes;
+import org.sonatype.aether.util.filter.DependencyFilterUtils;
+import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
+
+
+/**
+ *
+ */
+public class SparkDependencyContext {
+ List<Dependency> dependencies = new LinkedList<Dependency>();
+ List<Repository> repositories = new LinkedList<Repository>();
+
+ List<File> files = new LinkedList<File>();
+ List<File> filesDist = new LinkedList<File>();
+ private RepositorySystem system = Booter.newRepositorySystem();
+ private RepositorySystemSession session;
+ private RemoteRepository mavenCentral = Booter.newCentralRepository();
+ private RemoteRepository mavenLocal = Booter.newLocalRepository();
+ private List<RemoteRepository> additionalRepos = new LinkedList<RemoteRepository>();
+
+ public SparkDependencyContext(String localRepoPath, String additionalRemoteRepository) {
+ session = Booter.newRepositorySystemSession(system, localRepoPath);
+ addRepoFromProperty(additionalRemoteRepository);
+ }
+
+ public Dependency load(String lib) {
+ Dependency dep = new Dependency(lib);
+
+ if (dependencies.contains(dep)) {
+ dependencies.remove(dep);
+ }
+ dependencies.add(dep);
+ return dep;
+ }
+
+ public Repository addRepo(String name) {
+ Repository rep = new Repository(name);
+ repositories.add(rep);
+ return rep;
+ }
+
+ public void reset() {
+ dependencies = new LinkedList<Dependency>();
+ repositories = new LinkedList<Repository>();
+
+ files = new LinkedList<File>();
+ filesDist = new LinkedList<File>();
+ }
+
+ private void addRepoFromProperty(String listOfRepo) {
+ if (listOfRepo != null) {
+ String[] repos = listOfRepo.split(";");
+ for (String repo : repos) {
+ String[] parts = repo.split(",");
+ if (parts.length == 3) {
+ String id = parts[0].trim();
+ String url = parts[1].trim();
+ boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
+ if (id.length() > 1 && url.length() > 1) {
+ RemoteRepository rr = new RemoteRepository(id, "default", url);
+ rr.setPolicy(isSnapshot, null);
+ additionalRepos.add(rr);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * fetch all artifacts
+ * @return
+ * @throws MalformedURLException
+ * @throws ArtifactResolutionException
+ * @throws DependencyResolutionException
+ */
+ public List<File> fetch() throws MalformedURLException,
+ DependencyResolutionException, ArtifactResolutionException {
+
+ for (Dependency dep : dependencies) {
+ if (!dep.isLocalFsArtifact()) {
+ List<ArtifactResult> artifacts = fetchArtifactWithDep(dep);
+ for (ArtifactResult artifact : artifacts) {
+ if (dep.isDist()) {
+ filesDist.add(artifact.getArtifact().getFile());
+ }
+ files.add(artifact.getArtifact().getFile());
+ }
+ } else {
+ if (dep.isDist()) {
+ filesDist.add(new File(dep.getGroupArtifactVersion()));
+ }
+ files.add(new File(dep.getGroupArtifactVersion()));
+ }
+ }
+
+ return files;
+ }
+
+ private List<ArtifactResult> fetchArtifactWithDep(Dependency dep)
+ throws DependencyResolutionException, ArtifactResolutionException {
+ Artifact artifact = new DefaultArtifact(
+ SparkDependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion()));
+
+ DependencyFilter classpathFlter = DependencyFilterUtils
+ .classpathFilter(JavaScopes.COMPILE);
+ PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter(
+ SparkDependencyResolver.inferScalaVersion(dep.getExclusions()));
+
+ CollectRequest collectRequest = new CollectRequest();
+ collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact,
+ JavaScopes.COMPILE));
+
+ collectRequest.addRepository(mavenCentral);
+ collectRequest.addRepository(mavenLocal);
+ for (RemoteRepository repo : additionalRepos) {
+ collectRequest.addRepository(repo);
+ }
+ for (Repository repo : repositories) {
+ RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl());
+ rr.setPolicy(repo.isSnapshot(), null);
+ Authentication auth = repo.getAuthentication();
+ if (auth != null) {
+ rr.setAuthentication(auth);
+ }
+ collectRequest.addRepository(rr);
+ }
+
+ DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
+ DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
+
+ return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
+ }
+
+ public List<File> getFiles() {
+ return files;
+ }
+
+ public List<File> getFilesDist() {
+ return filesDist;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java
new file mode 100644
index 0000000..e4881d3
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark.dep;
+
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.spark.SparkContext;
+import org.apache.spark.repl.SparkIMain;
+import org.apache.zeppelin.dep.AbstractDependencyResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.artifact.Artifact;
+import org.sonatype.aether.collection.CollectRequest;
+import org.sonatype.aether.graph.Dependency;
+import org.sonatype.aether.graph.DependencyFilter;
+import org.sonatype.aether.repository.RemoteRepository;
+import org.sonatype.aether.resolution.ArtifactResult;
+import org.sonatype.aether.resolution.DependencyRequest;
+import org.sonatype.aether.util.artifact.DefaultArtifact;
+import org.sonatype.aether.util.artifact.JavaScopes;
+import org.sonatype.aether.util.filter.DependencyFilterUtils;
+import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
+
+import scala.Some;
+import scala.collection.IndexedSeq;
+import scala.reflect.io.AbstractFile;
+import scala.tools.nsc.Global;
+import scala.tools.nsc.backend.JavaPlatform;
+import scala.tools.nsc.util.ClassPath;
+import scala.tools.nsc.util.MergedClassPath;
+
+/**
+ * Deps resolver.
+ * Add new dependencies from mvn repo (at runtime) to Spark interpreter group.
+ */
+public class SparkDependencyResolver extends AbstractDependencyResolver {
+ Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class);
+ private Global global;
+ private SparkIMain intp;
+ private SparkContext sc;
+
+ private final String[] exclusions = new String[] {"org.scala-lang:scala-library",
+ "org.scala-lang:scala-compiler",
+ "org.scala-lang:scala-reflect",
+ "org.scala-lang:scalap",
+ "org.apache.zeppelin:zeppelin-zengine",
+ "org.apache.zeppelin:zeppelin-spark",
+ "org.apache.zeppelin:zeppelin-server"};
+
+ public SparkDependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath,
+ String additionalRemoteRepository) {
+ super(localRepoPath);
+ this.intp = intp;
+ this.global = intp.global();
+ this.sc = sc;
+ addRepoFromProperty(additionalRemoteRepository);
+ }
+
+ private void addRepoFromProperty(String listOfRepo) {
+ if (listOfRepo != null) {
+ String[] repos = listOfRepo.split(";");
+ for (String repo : repos) {
+ String[] parts = repo.split(",");
+ if (parts.length == 3) {
+ String id = parts[0].trim();
+ String url = parts[1].trim();
+ boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
+ if (id.length() > 1 && url.length() > 1) {
+ addRepo(id, url, isSnapshot);
+ }
+ }
+ }
+ }
+ }
+
+ private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException,
+ IllegalArgumentException, InvocationTargetException {
+
+ JavaPlatform platform = (JavaPlatform) global.platform();
+ MergedClassPath<AbstractFile> newClassPath = mergeUrlsIntoClassPath(platform, urls);
+
+ Method[] methods = platform.getClass().getMethods();
+ for (Method m : methods) {
+ if (m.getName().endsWith("currentClassPath_$eq")) {
+ m.invoke(platform, new Some(newClassPath));
+ break;
+ }
+ }
+
+ // NOTE: Must use reflection until this is exposed/fixed upstream in Scala
+ List<String> classPaths = new LinkedList<String>();
+ for (URL url : urls) {
+ classPaths.add(url.getPath());
+ }
+
+ // Reload all jars specified into our compiler
+ global.invalidateClassPathEntries(scala.collection.JavaConversions.asScalaBuffer(classPaths)
+ .toList());
+ }
+
+ // Until spark 1.1.x
+ // check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7
+ private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException,
+ IllegalAccessException, IllegalArgumentException,
+ InvocationTargetException, NoSuchMethodException {
+ ClassLoader cl = intp.classLoader().getParent();
+ Method addURL;
+ addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class});
+ addURL.setAccessible(true);
+ for (URL url : urls) {
+ addURL.invoke(cl, url);
+ }
+ }
+
+ private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException,
+ IllegalAccessException, IllegalArgumentException,
+ InvocationTargetException, NoSuchMethodException {
+ ClassLoader cl = intp.classLoader().getParent();
+ Method addURL;
+ addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class});
+ addURL.setAccessible(true);
+ for (URL url : urls) {
+ addURL.invoke(cl, url);
+ }
+ }
+
+ private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) {
+ IndexedSeq<ClassPath<AbstractFile>> entries =
+ ((MergedClassPath<AbstractFile>) platform.classPath()).entries();
+ List<ClassPath<AbstractFile>> cp = new LinkedList<ClassPath<AbstractFile>>();
+
+ for (int i = 0; i < entries.size(); i++) {
+ cp.add(entries.apply(i));
+ }
+
+ for (URL url : urls) {
+ AbstractFile file;
+ if ("file".equals(url.getProtocol())) {
+ File f = new File(url.getPath());
+ if (f.isDirectory()) {
+ file = AbstractFile.getDirectory(scala.reflect.io.File.jfile2path(f));
+ } else {
+ file = AbstractFile.getFile(scala.reflect.io.File.jfile2path(f));
+ }
+ } else {
+ file = AbstractFile.getURL(url);
+ }
+
+ ClassPath<AbstractFile> newcp = platform.classPath().context().newClassPath(file);
+
+ // distinct
+ if (cp.contains(newcp) == false) {
+ cp.add(newcp);
+ }
+ }
+
+ return new MergedClassPath(scala.collection.JavaConversions.asScalaBuffer(cp).toIndexedSeq(),
+ platform.classPath().context());
+ }
+
+ public List<String> load(String artifact,
+ boolean addSparkContext) throws Exception {
+ return load(artifact, new LinkedList<String>(), addSparkContext);
+ }
+
+ public List<String> load(String artifact, Collection<String> excludes,
+ boolean addSparkContext) throws Exception {
+ if (StringUtils.isBlank(artifact)) {
+ // Should throw here
+ throw new RuntimeException("Invalid artifact to load");
+ }
+
+ // <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
+ int numSplits = artifact.split(":").length;
+ if (numSplits >= 3 && numSplits <= 6) {
+ return loadFromMvn(artifact, excludes, addSparkContext);
+ } else {
+ loadFromFs(artifact, addSparkContext);
+ LinkedList<String> libs = new LinkedList<String>();
+ libs.add(artifact);
+ return libs;
+ }
+ }
+
+ private void loadFromFs(String artifact, boolean addSparkContext) throws Exception {
+ File jarFile = new File(artifact);
+
+ intp.global().new Run();
+
+ if (sc.version().startsWith("1.1")) {
+ updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()});
+ } else {
+ updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()});
+ }
+
+ if (addSparkContext) {
+ sc.addJar(jarFile.getAbsolutePath());
+ }
+ }
+
+ private List<String> loadFromMvn(String artifact, Collection<String> excludes,
+ boolean addSparkContext) throws Exception {
+ List<String> loadedLibs = new LinkedList<String>();
+ Collection<String> allExclusions = new LinkedList<String>();
+ allExclusions.addAll(excludes);
+ allExclusions.addAll(Arrays.asList(exclusions));
+
+ List<ArtifactResult> listOfArtifact;
+ listOfArtifact = getArtifactsWithDep(artifact, allExclusions);
+
+ Iterator<ArtifactResult> it = listOfArtifact.iterator();
+ while (it.hasNext()) {
+ Artifact a = it.next().getArtifact();
+ String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion();
+ for (String exclude : allExclusions) {
+ if (gav.startsWith(exclude)) {
+ it.remove();
+ break;
+ }
+ }
+ }
+
+ List<URL> newClassPathList = new LinkedList<URL>();
+ List<File> files = new LinkedList<File>();
+ for (ArtifactResult artifactResult : listOfArtifact) {
+ logger.info("Load " + artifactResult.getArtifact().getGroupId() + ":"
+ + artifactResult.getArtifact().getArtifactId() + ":"
+ + artifactResult.getArtifact().getVersion());
+ newClassPathList.add(artifactResult.getArtifact().getFile().toURI().toURL());
+ files.add(artifactResult.getArtifact().getFile());
+ loadedLibs.add(artifactResult.getArtifact().getGroupId() + ":"
+ + artifactResult.getArtifact().getArtifactId() + ":"
+ + artifactResult.getArtifact().getVersion());
+ }
+
+ intp.global().new Run();
+ if (sc.version().startsWith("1.1")) {
+ updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0]));
+ } else {
+ updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0]));
+ }
+ updateCompilerClassPath(newClassPathList.toArray(new URL[0]));
+
+ if (addSparkContext) {
+ for (File f : files) {
+ sc.addJar(f.getAbsolutePath());
+ }
+ }
+
+ return loadedLibs;
+ }
+
+ /**
+ * @param dependency
+ * @param excludes list of pattern can either be of the form groupId:artifactId
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public List<ArtifactResult> getArtifactsWithDep(String dependency,
+ Collection<String> excludes) throws Exception {
+ Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency));
+ DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE);
+ PatternExclusionsDependencyFilter exclusionFilter =
+ new PatternExclusionsDependencyFilter(inferScalaVersion(excludes));
+
+ CollectRequest collectRequest = new CollectRequest();
+ collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE));
+
+ synchronized (repos) {
+ for (RemoteRepository repo : repos) {
+ collectRequest.addRepository(repo);
+ }
+ }
+ DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
+ DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter));
+ return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
+ }
+
+ public static Collection<String> inferScalaVersion(Collection<String> artifact) {
+ List<String> list = new LinkedList<String>();
+ for (String a : artifact) {
+ list.add(inferScalaVersion(a));
+ }
+ return list;
+ }
+
+ public static String inferScalaVersion(String artifact) {
+ int pos = artifact.indexOf(":");
+ if (pos < 0 || pos + 2 >= artifact.length()) {
+ // failed to infer
+ return artifact;
+ }
+
+ if (':' == artifact.charAt(pos + 1)) {
+ String restOfthem = "";
+ String versionSep = ":";
+
+ String groupId = artifact.substring(0, pos);
+ int nextPos = artifact.indexOf(":", pos + 2);
+ if (nextPos < 0) {
+ if (artifact.charAt(artifact.length() - 1) == '*') {
+ nextPos = artifact.length() - 1;
+ versionSep = "";
+ restOfthem = "*";
+ } else {
+ versionSep = "";
+ nextPos = artifact.length();
+ }
+ }
+
+ String artifactId = artifact.substring(pos + 2, nextPos);
+ if (nextPos < artifact.length()) {
+ if (!restOfthem.equals("*")) {
+ restOfthem = artifact.substring(nextPos + 1);
+ }
+ }
+
+ String [] version = scala.util.Properties.versionNumberString().split("[.]");
+ String scalaVersion = version[0] + "." + version[1];
+
+ return groupId + ":" + artifactId + "_" + scalaVersion + versionSep + restOfthem;
+ } else {
+ return artifact;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/main/java/org/apache/zeppelin/spark/dep/TransferListener.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/TransferListener.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/TransferListener.java
deleted file mode 100644
index 8a4058a..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/TransferListener.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.transfer.AbstractTransferListener;
-import org.sonatype.aether.transfer.TransferEvent;
-import org.sonatype.aether.transfer.TransferResource;
-
-/**
- * Simple listener that show deps downloading progress.
- */
-public class TransferListener extends AbstractTransferListener {
- Logger logger = LoggerFactory.getLogger(TransferListener.class);
- private PrintStream out;
-
- private Map<TransferResource, Long> downloads = new ConcurrentHashMap<TransferResource, Long>();
-
- private int lastLength;
-
- public TransferListener() {}
-
- @Override
- public void transferInitiated(TransferEvent event) {
- String message =
- event.getRequestType() == TransferEvent.RequestType.PUT ? "Uploading" : "Downloading";
-
- logger.info(message + ": " + event.getResource().getRepositoryUrl()
- + event.getResource().getResourceName());
- }
-
- @Override
- public void transferProgressed(TransferEvent event) {
- TransferResource resource = event.getResource();
- downloads.put(resource, Long.valueOf(event.getTransferredBytes()));
-
- StringBuilder buffer = new StringBuilder(64);
-
- for (Map.Entry<TransferResource, Long> entry : downloads.entrySet()) {
- long total = entry.getKey().getContentLength();
- long complete = entry.getValue().longValue();
-
- buffer.append(getStatus(complete, total)).append(" ");
- }
-
- int pad = lastLength - buffer.length();
- lastLength = buffer.length();
- pad(buffer, pad);
- buffer.append('\r');
-
- logger.info(buffer.toString());
- }
-
- private String getStatus(long complete, long total) {
- if (total >= 1024) {
- return toKB(complete) + "/" + toKB(total) + " KB ";
- } else if (total >= 0) {
- return complete + "/" + total + " B ";
- } else if (complete >= 1024) {
- return toKB(complete) + " KB ";
- } else {
- return complete + " B ";
- }
- }
-
- private void pad(StringBuilder buffer, int spaces) {
- String block = " ";
- while (spaces > 0) {
- int n = Math.min(spaces, block.length());
- buffer.append(block, 0, n);
- spaces -= n;
- }
- }
-
- @Override
- public void transferSucceeded(TransferEvent event) {
- transferCompleted(event);
-
- TransferResource resource = event.getResource();
- long contentLength = event.getTransferredBytes();
- if (contentLength >= 0) {
- String type =
- (event.getRequestType() == TransferEvent.RequestType.PUT ? "Uploaded" : "Downloaded");
- String len = contentLength >= 1024 ? toKB(contentLength) + " KB" : contentLength + " B";
-
- String throughput = "";
- long duration = System.currentTimeMillis() - resource.getTransferStartTime();
- if (duration > 0) {
- DecimalFormat format = new DecimalFormat("0.0", new DecimalFormatSymbols(Locale.ENGLISH));
- double kbPerSec = (contentLength / 1024.0) / (duration / 1000.0);
- throughput = " at " + format.format(kbPerSec) + " KB/sec";
- }
-
- logger.info(type + ": " + resource.getRepositoryUrl() + resource.getResourceName() + " ("
- + len + throughput + ")");
- }
- }
-
- @Override
- public void transferFailed(TransferEvent event) {
- transferCompleted(event);
- event.getException().printStackTrace(out);
- }
-
- private void transferCompleted(TransferEvent event) {
- downloads.remove(event.getResource());
- StringBuilder buffer = new StringBuilder(64);
- pad(buffer, lastLength);
- buffer.append('\r');
- logger.info(buffer.toString());
- }
-
- @Override
- public void transferCorrupted(TransferEvent event) {
- event.getException().printStackTrace(out);
- }
-
- protected long toKB(long bytes) {
- return (bytes + 1023) / 1024;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java b/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java
deleted file mode 100644
index e41de60..0000000
--- a/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.zeppelin.spark.dep.DependencyResolver;
-import org.junit.Test;
-
-public class DependencyResolverTest {
-
- @Test
- public void testInferScalaVersion() {
- String [] version = scala.util.Properties.versionNumberString().split("[.]");
- String scalaVersion = version[0] + "." + version[1];
-
- assertEquals("groupId:artifactId:version",
- DependencyResolver.inferScalaVersion("groupId:artifactId:version"));
- assertEquals("groupId:artifactId_" + scalaVersion + ":version",
- DependencyResolver.inferScalaVersion("groupId::artifactId:version"));
- assertEquals("groupId:artifactId:version::test",
- DependencyResolver.inferScalaVersion("groupId:artifactId:version::test"));
- assertEquals("*",
- DependencyResolver.inferScalaVersion("*"));
- assertEquals("groupId:*",
- DependencyResolver.inferScalaVersion("groupId:*"));
- assertEquals("groupId:artifactId*",
- DependencyResolver.inferScalaVersion("groupId:artifactId*"));
- assertEquals("groupId:artifactId_" + scalaVersion,
- DependencyResolver.inferScalaVersion("groupId::artifactId"));
- assertEquals("groupId:artifactId_" + scalaVersion + "*",
- DependencyResolver.inferScalaVersion("groupId::artifactId*"));
- assertEquals("groupId:artifactId_" + scalaVersion + ":*",
- DependencyResolver.inferScalaVersion("groupId::artifactId:*"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java
new file mode 100644
index 0000000..a0271f4
--- /dev/null
+++ b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark.dep;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
+import org.junit.Test;
+
+public class SparkDependencyResolverTest {
+
+ @Test
+ public void testInferScalaVersion() {
+ String [] version = scala.util.Properties.versionNumberString().split("[.]");
+ String scalaVersion = version[0] + "." + version[1];
+
+ assertEquals("groupId:artifactId:version",
+ SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version"));
+ assertEquals("groupId:artifactId_" + scalaVersion + ":version",
+ SparkDependencyResolver.inferScalaVersion("groupId::artifactId:version"));
+ assertEquals("groupId:artifactId:version::test",
+ SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version::test"));
+ assertEquals("*",
+ SparkDependencyResolver.inferScalaVersion("*"));
+ assertEquals("groupId:*",
+ SparkDependencyResolver.inferScalaVersion("groupId:*"));
+ assertEquals("groupId:artifactId*",
+ SparkDependencyResolver.inferScalaVersion("groupId:artifactId*"));
+ assertEquals("groupId:artifactId_" + scalaVersion,
+ SparkDependencyResolver.inferScalaVersion("groupId::artifactId"));
+ assertEquals("groupId:artifactId_" + scalaVersion + "*",
+ SparkDependencyResolver.inferScalaVersion("groupId::artifactId*"));
+ assertEquals("groupId:artifactId_" + scalaVersion + ":*",
+ SparkDependencyResolver.inferScalaVersion("groupId::artifactId:*"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/bc715511/zeppelin-interpreter/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 59d2cd0..67b4d5f 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -97,6 +97,123 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang.version}</version>
</dependency>
+
+ <!-- Aether :: maven dependency resolution -->
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ <version>3.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.codehaus.plexus</groupId>
+ <artifactId>plexus-utils</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.sonatype.sisu</groupId>
+ <artifactId>sisu-inject-plexus</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-model</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-api</artifactId>
+ <version>1.12</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-util</artifactId>
+ <version>1.12</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-impl</artifactId>
+ <version>1.12</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-aether-provider</artifactId>
+ <version>3.0.3</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-spi</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.plexus</groupId>
+ <artifactId>plexus-utils</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-connector-file</artifactId>
+ <version>1.12</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.sonatype.aether</groupId>
+ <artifactId>aether-connector-wagon</artifactId>
+ <version>1.12</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.maven.wagon</groupId>
+ <artifactId>wagon-provider-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.maven.wagon</groupId>
+ <artifactId>wagon-provider-api</artifactId>
+ <version>1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.codehaus.plexus</groupId>
+ <artifactId>plexus-utils</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.maven.wagon</groupId>
+ <artifactId>wagon-http-lightweight</artifactId>
+ <version>1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.maven.wagon</groupId>
+ <artifactId>wagon-http-shared</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.maven.wagon</groupId>
+ <artifactId>wagon-http</artifactId>
+ <version>1.0</version>
+ <exclusions>
+ </exclusions>
+ </dependency>
</dependencies>
<build>