You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2018/08/26 04:44:23 UTC
[40/49] zeppelin git commit: [ZEPPELIN-3740] Adopt
`google-java-format` and `fmt-maven-plugin`
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java
----------------------------------------------------------------------
diff --git a/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java b/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java
index 24bd513..e398568 100644
--- a/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java
+++ b/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java
@@ -16,7 +16,12 @@
*/
package org.apache.zeppelin.graph.neo4j;
+import static org.junit.Assert.assertEquals;
+
import com.google.gson.Gson;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.graph.neo4j.Neo4jConnectionManager.Neo4jAuthType;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -34,12 +39,6 @@ import org.junit.runners.MethodSorters;
import org.neo4j.harness.ServerControls;
import org.neo4j.harness.TestServerBuilders;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class Neo4jCypherInterpreterTest {
@@ -55,25 +54,27 @@ public class Neo4jCypherInterpreterTest {
private static final String REL_KNOWS = "KNOWS";
private static final String CYPHER_FOREACH =
- "FOREACH (x in range(1,1000) | CREATE (:%s{name: \"name\" + x, age: %s}))";
- private static final String CHPHER_UNWIND = "UNWIND range(1,1000) as x "
- + "MATCH (n), (m) WHERE id(n) = x AND id(m) = toInt(rand() * 1000) "
- + "CREATE (n)-[:%s]->(m)";
+ "FOREACH (x in range(1,1000) | CREATE (:%s{name: \"name\" + x, age: %s}))";
+ private static final String CHPHER_UNWIND =
+ "UNWIND range(1,1000) as x "
+ + "MATCH (n), (m) WHERE id(n) = x AND id(m) = toInt(rand() * 1000) "
+ + "CREATE (n)-[:%s]->(m)";
@BeforeClass
public static void setUpNeo4jServer() throws Exception {
- server = TestServerBuilders.newInProcessBuilder()
- .withConfig("dbms.security.auth_enabled", "false")
- .withFixture(String.format(CYPHER_FOREACH, LABEL_PERSON, "x % 10"))
- .withFixture(String.format(CHPHER_UNWIND, REL_KNOWS))
- .newServer();
+ server =
+ TestServerBuilders.newInProcessBuilder()
+ .withConfig("dbms.security.auth_enabled", "false")
+ .withFixture(String.format(CYPHER_FOREACH, LABEL_PERSON, "x % 10"))
+ .withFixture(String.format(CHPHER_UNWIND, REL_KNOWS))
+ .newServer();
}
@AfterClass
public static void tearDownNeo4jServer() throws Exception {
server.close();
}
-
+
@Before
public void setUpZeppelin() {
Properties p = new Properties();
@@ -81,9 +82,8 @@ public class Neo4jCypherInterpreterTest {
p.setProperty(Neo4jConnectionManager.NEO4J_AUTH_TYPE, Neo4jAuthType.NONE.toString());
p.setProperty(Neo4jConnectionManager.NEO4J_MAX_CONCURRENCY, "50");
interpreter = new Neo4jCypherInterpreter(p);
- context = InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput(null))
- .build();;
+ context = InterpreterContext.builder().setInterpreterOut(new InterpreterOutput(null)).build();
+ ;
}
@After
@@ -94,17 +94,18 @@ public class Neo4jCypherInterpreterTest {
@Test
public void testTableWithArray() {
interpreter.open();
- InterpreterResult result = interpreter.interpret(
- "return 'a' as colA, 'b' as colB, [1, 2, 3] as colC", context);
+ InterpreterResult result =
+ interpreter.interpret("return 'a' as colA, 'b' as colB, [1, 2, 3] as colC", context);
assertEquals(Code.SUCCESS, result.code());
final String tableResult = "colA\tcolB\tcolC\n\"a\"\t\"b\"\t[1,2,3]\n";
assertEquals(tableResult, result.toString().replace("%table ", StringUtils.EMPTY));
-
- result = interpreter.interpret(
+
+ result =
+ interpreter.interpret(
"return 'a' as colA, 'b' as colB, [{key: \"value\"}, {key: 1}] as colC", context);
assertEquals(Code.SUCCESS, result.code());
final String tableResultWithMap =
- "colA\tcolB\tcolC\n\"a\"\t\"b\"\t[{\"key\":\"value\"},{\"key\":1}]\n";
+ "colA\tcolB\tcolC\n\"a\"\t\"b\"\t[{\"key\":\"value\"},{\"key\":1}]\n";
assertEquals(tableResultWithMap, result.toString().replace("%table ", StringUtils.EMPTY));
}
@@ -119,9 +120,12 @@ public class Neo4jCypherInterpreterTest {
@Test
public void testRenderTable() {
interpreter.open();
- InterpreterResult result = interpreter.interpret("MATCH (n:Person) "
- + "WHERE n.name IN ['name1', 'name2', 'name3'] "
- + "RETURN n.name AS name, n.age AS age", context);
+ InterpreterResult result =
+ interpreter.interpret(
+ "MATCH (n:Person) "
+ + "WHERE n.name IN ['name1', 'name2', 'name3'] "
+ + "RETURN n.name AS name, n.age AS age",
+ context);
assertEquals(Code.SUCCESS, result.code());
final String tableResult = "name\tage\n\"name1\"\t1\n\"name2\"\t2\n\"name3\"\t3\n";
assertEquals(tableResult, result.toString().replace("%table ", StringUtils.EMPTY));
@@ -131,12 +135,15 @@ public class Neo4jCypherInterpreterTest {
public void testRenderMap() {
interpreter.open();
final String jsonQuery =
- "RETURN {key: \"value\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]} as object";
+ "RETURN {key: \"value\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]} as object";
final String objectKey = "object.key";
final String objectListKey = "object.listKey";
InterpreterResult result = interpreter.interpret(jsonQuery, context);
assertEquals(Code.SUCCESS, result.code());
- String[] rows = result.toString().replace("%table ", StringUtils.EMPTY)
+ String[] rows =
+ result
+ .toString()
+ .replace("%table ", StringUtils.EMPTY)
.split(Neo4jCypherInterpreter.NEW_LINE);
assertEquals(rows.length, 2);
List<String> header = Arrays.asList(rows[0].split(Neo4jCypherInterpreter.TAB));
@@ -145,15 +152,19 @@ public class Neo4jCypherInterpreterTest {
List<String> row = Arrays.asList(rows[1].split(Neo4jCypherInterpreter.TAB));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value");
- assertEquals(row.get(header.indexOf(objectListKey)),
- "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
+ assertEquals(
+ row.get(header.indexOf(objectListKey)), "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
- final String query = "WITH [{key: \"value\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]},"
+ final String query =
+ "WITH [{key: \"value\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]},"
+ "{key: \"value2\", listKey: [{inner: \"Map12\"}, {inner: \"Map22\"}]}] "
+ "AS array UNWIND array AS object RETURN object";
result = interpreter.interpret(query, context);
assertEquals(Code.SUCCESS, result.code());
- rows = result.toString().replace("%table ", StringUtils.EMPTY)
+ rows =
+ result
+ .toString()
+ .replace("%table ", StringUtils.EMPTY)
.split(Neo4jCypherInterpreter.NEW_LINE);
assertEquals(rows.length, 3);
header = Arrays.asList(rows[0].split(Neo4jCypherInterpreter.TAB));
@@ -162,20 +173,24 @@ public class Neo4jCypherInterpreterTest {
row = Arrays.asList(rows[1].split(Neo4jCypherInterpreter.TAB));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value");
- assertEquals(row.get(header.indexOf(objectListKey)),
- "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
+ assertEquals(
+ row.get(header.indexOf(objectListKey)), "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
row = Arrays.asList(rows[2].split(Neo4jCypherInterpreter.TAB));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value2");
- assertEquals(row.get(header.indexOf(objectListKey)),
- "[{\"inner\":\"Map12\"},{\"inner\":\"Map22\"}]");
+ assertEquals(
+ row.get(header.indexOf(objectListKey)), "[{\"inner\":\"Map12\"},{\"inner\":\"Map22\"}]");
- final String jsonListWithNullQuery = "WITH [{key: \"value\", listKey: null},"
+ final String jsonListWithNullQuery =
+ "WITH [{key: \"value\", listKey: null},"
+ "{key: \"value2\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]}] "
+ "AS array UNWIND array AS object RETURN object";
result = interpreter.interpret(jsonListWithNullQuery, context);
assertEquals(Code.SUCCESS, result.code());
- rows = result.toString().replace("%table ", StringUtils.EMPTY)
+ rows =
+ result
+ .toString()
+ .replace("%table ", StringUtils.EMPTY)
.split(Neo4jCypherInterpreter.NEW_LINE);
assertEquals(rows.length, 3);
header = Arrays.asList(rows[0].split(Neo4jCypherInterpreter.TAB, -1));
@@ -189,15 +204,19 @@ public class Neo4jCypherInterpreterTest {
row = Arrays.asList(rows[2].split(Neo4jCypherInterpreter.TAB, -1));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value2");
- assertEquals(row.get(header.indexOf(objectListKey)),
- "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
-
- final String jsonListWithoutListKeyQuery = "WITH [{key: \"value\"},"
+ assertEquals(
+ row.get(header.indexOf(objectListKey)), "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
+
+ final String jsonListWithoutListKeyQuery =
+ "WITH [{key: \"value\"},"
+ "{key: \"value2\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]}] "
+ "AS array UNWIND array AS object RETURN object";
result = interpreter.interpret(jsonListWithoutListKeyQuery, context);
assertEquals(Code.SUCCESS, result.code());
- rows = result.toString().replace("%table ", StringUtils.EMPTY)
+ rows =
+ result
+ .toString()
+ .replace("%table ", StringUtils.EMPTY)
.split(Neo4jCypherInterpreter.NEW_LINE);
assertEquals(rows.length, 3);
header = Arrays.asList(rows[0].split(Neo4jCypherInterpreter.TAB, -1));
@@ -210,17 +229,18 @@ public class Neo4jCypherInterpreterTest {
row = Arrays.asList(rows[2].split(Neo4jCypherInterpreter.TAB, -1));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value2");
- assertEquals(row.get(header.indexOf(objectListKey)),
- "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
+ assertEquals(
+ row.get(header.indexOf(objectListKey)), "[{\"inner\":\"Map1\"},{\"inner\":\"Map2\"}]");
}
@Test
public void testRenderNetwork() {
interpreter.open();
- InterpreterResult result = interpreter.interpret(
- "MATCH (n)-[r:KNOWS]-(m) RETURN n, r, m LIMIT 1", context);
- GraphResult.Graph graph = gson.fromJson(result.toString().replace("%network ",
- StringUtils.EMPTY), GraphResult.Graph.class);
+ InterpreterResult result =
+ interpreter.interpret("MATCH (n)-[r:KNOWS]-(m) RETURN n, r, m LIMIT 1", context);
+ GraphResult.Graph graph =
+ gson.fromJson(
+ result.toString().replace("%network ", StringUtils.EMPTY), GraphResult.Graph.class);
assertEquals(2, graph.getNodes().size());
assertEquals(true, graph.getNodes().iterator().next().getLabel().equals(LABEL_PERSON));
assertEquals(1, graph.getEdges().size());
@@ -244,8 +264,9 @@ public class Neo4jCypherInterpreterTest {
assertEquals(Code.SUCCESS, result.code());
assertEquals(errorMsgEmpty, result.toString());
- result = interpreter.interpret("MATCH (n:Person{name: }) RETURN n.name AS name, n.age AS age",
- context);
+ result =
+ interpreter.interpret(
+ "MATCH (n:Person{name: }) RETURN n.name AS name, n.age AS age", context);
assertEquals(Code.ERROR, result.code());
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/pom.xml
----------------------------------------------------------------------
diff --git a/pig/pom.xml b/pig/pom.xml
index 571d198..4553b5c 100644
--- a/pig/pom.xml
+++ b/pig/pom.xml
@@ -190,13 +190,6 @@
<forkMode>always</forkMode>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <skip>false</skip>
- </configuration>
- </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
index 9503aa7..97d15de 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
@@ -17,28 +17,24 @@
package org.apache.zeppelin.pig;
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Field;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/**
- *
- */
+/** */
public abstract class BasePigInterpreter extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(BasePigInterpreter.class);
@@ -60,7 +56,7 @@ public abstract class BasePigInterpreter extends Interpreter {
for (String jobId : jobIds) {
LOGGER.info("Kill jobId:" + jobId);
HExecutionEngine engine =
- (HExecutionEngine) getPigServer().getPigContext().getExecutionEngine();
+ (HExecutionEngine) getPigServer().getPigContext().getExecutionEngine();
try {
Field launcherField = HExecutionEngine.class.getDeclaredField("launcher");
launcherField.setAccessible(true);
@@ -72,8 +68,8 @@ public abstract class BasePigInterpreter extends Interpreter {
}
}
} else {
- LOGGER.warn("No PigScriptListener found, can not cancel paragraph:"
- + context.getParagraphId());
+ LOGGER.warn(
+ "No PigScriptListener found, can not cancel paragraph:" + context.getParagraphId());
}
}
@@ -93,14 +89,15 @@ public abstract class BasePigInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler(
- PigInterpreter.class.getName() + this.hashCode());
+ return SchedulerFactory.singleton()
+ .createOrGetFIFOScheduler(PigInterpreter.class.getName() + this.hashCode());
}
public abstract PigServer getPigServer();
/**
* Use paragraph title if it exists, else use the last line of pig script.
+ *
* @param cmd
* @param context
* @return
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
index 4fc0676..5f79e97 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
@@ -17,6 +17,11 @@
package org.apache.zeppelin.pig;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.PigServer;
@@ -24,22 +29,13 @@ import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Map;
-import java.util.Properties;
-
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/**
- * Pig interpreter for Zeppelin.
- */
+/** Pig interpreter for Zeppelin. */
public class PigInterpreter extends BasePigInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(PigInterpreter.class);
@@ -64,8 +60,10 @@ public class PigInterpreter extends BasePigInterpreter {
pigServer = new PigServer(execType);
for (Map.Entry entry : getProperties().entrySet()) {
if (!entry.getKey().toString().startsWith("zeppelin.")) {
- pigServer.getPigContext().getProperties().setProperty(entry.getKey().toString(),
- entry.getValue().toString());
+ pigServer
+ .getPigContext()
+ .getProperties()
+ .setProperty(entry.getKey().toString(), entry.getValue().toString());
}
}
} catch (IOException e) {
@@ -79,7 +77,6 @@ public class PigInterpreter extends BasePigInterpreter {
pigServer = null;
}
-
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
// remember the origial stdout, because we will redirect stdout to capture
@@ -146,10 +143,7 @@ public class PigInterpreter extends BasePigInterpreter {
return new InterpreterResult(Code.SUCCESS, outputBuilder.toString());
}
-
public PigServer getPigServer() {
return pigServer;
}
-
}
-
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
index 97c2f6e..0c6eef6 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
@@ -17,6 +17,12 @@
package org.apache.zeppelin.pig;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.PigServer;
@@ -26,25 +32,15 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.ResultMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/**
- *
- */
+/** */
public class PigQueryInterpreter extends BasePigInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(PigQueryInterpreter.class);
private static final String MAX_RESULTS = "zeppelin.pig.maxResult";
@@ -62,9 +58,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
}
@Override
- public void close() {
-
- }
+ public void close() {}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
index 8ff1bf8..0e6068a 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
@@ -17,6 +17,8 @@
package org.apache.zeppelin.pig;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
@@ -24,12 +26,7 @@ import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- *
- */
+/** */
public class PigScriptListener implements PigProgressNotificationListener {
private static final Logger LOGGER = LoggerFactory.getLogger(PigScriptListener.class);
@@ -37,19 +34,13 @@ public class PigScriptListener implements PigProgressNotificationListener {
private int progress;
@Override
- public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {
-
- }
+ public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {}
@Override
- public void launchStartedNotification(String scriptId, int numJobsToLaunch) {
-
- }
+ public void launchStartedNotification(String scriptId, int numJobsToLaunch) {}
@Override
- public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {
-
- }
+ public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {}
@Override
public void jobStartedNotification(String scriptId, String assignedJobId) {
@@ -57,19 +48,13 @@ public class PigScriptListener implements PigProgressNotificationListener {
}
@Override
- public void jobFinishedNotification(String scriptId, JobStats jobStats) {
-
- }
+ public void jobFinishedNotification(String scriptId, JobStats jobStats) {}
@Override
- public void jobFailedNotification(String scriptId, JobStats jobStats) {
-
- }
+ public void jobFailedNotification(String scriptId, JobStats jobStats) {}
@Override
- public void outputCompletedNotification(String scriptId, OutputStats outputStats) {
-
- }
+ public void outputCompletedNotification(String scriptId, OutputStats outputStats) {}
@Override
public void progressUpdatedNotification(String scriptId, int progress) {
@@ -78,9 +63,7 @@ public class PigScriptListener implements PigProgressNotificationListener {
}
@Override
- public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
-
- }
+ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {}
public Set<String> getJobIds() {
return jobIds;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
index 1c48250..28fd763 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
@@ -17,19 +17,16 @@
package org.apache.zeppelin.pig;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/**
- *
- */
+/** */
public class PigUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(PigUtils.class);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java
----------------------------------------------------------------------
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java
index ea1a3f8..1fd5c82 100644
--- a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java
@@ -1,37 +1,31 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.zeppelin.pig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.commons.io.IOUtils;
-import org.junit.After;
-import org.junit.Test;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
-
+import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Test;
public class PigInterpreterSparkTest {
private PigInterpreter pigInterpreter;
@@ -44,8 +38,8 @@ public class PigInterpreterSparkTest {
pigInterpreter = new PigInterpreter(properties);
pigInterpreter.open();
context = InterpreterContext.builder().setParagraphId("paragraphId").build();
-
}
+
@After
public void tearDown() {
pigInterpreter.close();
@@ -55,41 +49,44 @@ public class PigInterpreterSparkTest {
public void testBasics() throws IOException {
setUpSpark(false);
- String content = "1\tandy\n"
- + "2\tpeter\n";
+ String content = "1\tandy\n" + "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
- String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "dump a;";
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
- + "describe a;";
+ pigscript =
+ "a = load '"
+ + tmpFile.getAbsolutePath()
+ + "' as (id: int, name: bytearray);"
+ + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
- pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
- + "describe a;";
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertTrue(result.message().get(0).getData().contains(
- "Syntax error, unexpected symbol at or near 'a'"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("Syntax error, unexpected symbol at or near 'a'"));
// syntax error
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "foreach a generate $0;";
+ pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "foreach a generate $0;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
@@ -100,16 +97,14 @@ public class PigInterpreterSparkTest {
public void testIncludeJobStats() throws IOException {
setUpSpark(true);
- String content = "1\tandy\n"
- + "2\tpeter\n";
+ String content = "1\tandy\n" + "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
- String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "dump a;";
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -117,8 +112,11 @@ public class PigInterpreterSparkTest {
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
- + "describe a;";
+ pigscript =
+ "a = load '"
+ + tmpFile.getAbsolutePath()
+ + "' as (id: int, name: bytearray);"
+ + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -126,24 +124,23 @@ public class PigInterpreterSparkTest {
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
- pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
- + "describe a;";
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
// no job is launched, so no jobStats
- assertTrue(result.message().get(0).getData().contains(
- "Syntax error, unexpected symbol at or near 'a'"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("Syntax error, unexpected symbol at or near 'a'"));
// execution error
- pigscript = "a = load 'invalid_path';"
- + "dump a;";
+ pigscript = "a = load 'invalid_path';" + "dump a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().get(0).getData().contains("Failed to read data from"));
}
-
}
-
-
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
----------------------------------------------------------------------
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
index 5a21bb3..f1f6ad8 100644
--- a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
@@ -1,39 +1,33 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.zeppelin.pig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.commons.io.IOUtils;
-import org.junit.After;
-import org.junit.Test;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
-
+import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.junit.After;
+import org.junit.Test;
public class PigInterpreterTest {
@@ -58,23 +52,24 @@ public class PigInterpreterTest {
public void testBasics() throws IOException {
setUpLocal(false);
- String content = "1\tandy\n"
- + "2\tpeter\n";
+ String content = "1\tandy\n" + "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
- String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "dump a;";
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ pigscript =
+ "a = load '"
+ + tmpFile.getAbsolutePath()
+ + "' as (id: int, name: bytearray);"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
@@ -82,38 +77,37 @@ public class PigInterpreterTest {
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
- pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
- + "describe a;";
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
- assertTrue(result.message().get(0).getData().contains(
- "Syntax error, unexpected symbol at or near 'a'"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("Syntax error, unexpected symbol at or near 'a'"));
// execution error
- pigscript = "a = load 'invalid_path';"
- + "dump a;";
+ pigscript = "a = load 'invalid_path';" + "dump a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
assertTrue(result.message().get(0).getData().contains("Input path does not exist"));
}
-
@Test
public void testIncludeJobStats() throws IOException {
setUpLocal(true);
- String content = "1\tandy\n"
- + "2\tpeter\n";
+ String content = "1\tandy\n" + "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
- String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "dump a;";
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.SUCCESS, result.code());
@@ -121,7 +115,10 @@ public class PigInterpreterTest {
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ pigscript =
+ "a = load '"
+ + tmpFile.getAbsolutePath()
+ + "' as (id: int, name: bytearray);"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
@@ -131,24 +128,25 @@ public class PigInterpreterTest {
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
- pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
- + "describe a;";
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
// no job is launched, so no jobStats
assertTrue(!result.message().get(0).getData().contains("Counters:"));
- assertTrue(result.message().get(0).getData().contains(
- "Syntax error, unexpected symbol at or near 'a'"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("Syntax error, unexpected symbol at or near 'a'"));
// execution error
- pigscript = "a = load 'invalid_path';"
- + "dump a;";
+ pigscript = "a = load 'invalid_path';" + "dump a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
assertTrue(result.message().get(0).getData().contains("Counters:"));
assertTrue(result.message().get(0).getData().contains("Input path does not exist"));
}
-
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java
----------------------------------------------------------------------
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java
index ec09a88..14d9686 100644
--- a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java
@@ -1,39 +1,33 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.zeppelin.pig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.commons.io.IOUtils;
-import org.junit.After;
-import org.junit.Test;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
-
+import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.junit.After;
+import org.junit.Test;
public class PigInterpreterTezTest {
@@ -48,8 +42,8 @@ public class PigInterpreterTezTest {
pigInterpreter = new PigInterpreter(properties);
pigInterpreter.open();
context = InterpreterContext.builder().setParagraphId("paragraphId").build();
-
}
+
@After
public void tearDown() {
pigInterpreter.close();
@@ -59,45 +53,52 @@ public class PigInterpreterTezTest {
public void testBasics() throws IOException {
setUpTez(false);
- assertEquals("test",
- pigInterpreter.getPigServer().getPigContext().getProperties()
+ assertEquals(
+ "test",
+ pigInterpreter
+ .getPigServer()
+ .getPigContext()
+ .getProperties()
.getProperty("tez.queue.name"));
-
- String content = "1\tandy\n"
- + "2\tpeter\n";
+
+ String content = "1\tandy\n" + "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
- String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "dump a;";
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
- + "describe a;";
+ pigscript =
+ "a = load '"
+ + tmpFile.getAbsolutePath()
+ + "' as (id: int, name: bytearray);"
+ + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
- pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
- + "describe a;";
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
- assertTrue(result.message().get(0).getData().contains(
- "Syntax error, unexpected symbol at or near 'a'"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("Syntax error, unexpected symbol at or near 'a'"));
// syntax error
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "foreach a generate $0;";
+ pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "foreach a generate $0;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
@@ -108,16 +109,14 @@ public class PigInterpreterTezTest {
public void testIncludeJobStats() throws IOException {
setUpTez(true);
- String content = "1\tandy\n"
- + "2\tpeter\n";
+ String content = "1\tandy\n" + "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
- String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
- + "dump a;";
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.SUCCESS, result.code());
@@ -125,8 +124,11 @@ public class PigInterpreterTezTest {
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
- pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
- + "describe a;";
+ pigscript =
+ "a = load '"
+ + tmpFile.getAbsolutePath()
+ + "' as (id: int, name: bytearray);"
+ + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.SUCCESS, result.code());
@@ -135,19 +137,21 @@ public class PigInterpreterTezTest {
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
- pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
- + "describe a;";
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
// no job is launched, so no jobStats
assertTrue(!result.message().get(0).getData().contains("Vertex Stats"));
- assertTrue(result.message().get(0).getData().contains(
- "Syntax error, unexpected symbol at or near 'a'"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("Syntax error, unexpected symbol at or near 'a'"));
// execution error
- pigscript = "a = load 'invalid_path';"
- + "dump a;";
+ pigscript = "a = load 'invalid_path';" + "dump a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.message().get(0).getType());
assertEquals(Code.ERROR, result.code());
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
----------------------------------------------------------------------
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
index 01f0a20..a850174 100644
--- a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
@@ -1,48 +1,40 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.zeppelin.pig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.commons.io.IOUtils;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-
+import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
-/**
- *
- */
+/** */
public class PigQueryInterpreterTest {
private Interpreter pigInterpreter;
@@ -78,23 +70,28 @@ public class PigQueryInterpreterTest {
@Test
public void testBasics() throws IOException, InterpreterException {
- String content = "andy\tmale\t10\n"
- + "peter\tmale\t20\n"
- + "amy\tfemale\t14\n";
+ String content = "andy\tmale\t10\n" + "peter\tmale\t20\n" + "amy\tfemale\t14\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// run script in PigInterpreter
- String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, gender, age);\n"
+ String pigscript =
+ "a = load '"
+ + tmpFile.getAbsolutePath()
+ + "' as (name, gender, age);\n"
+ "a2 = load 'invalid_path' as (name, gender, age);\n"
+ "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertTrue(result.message().get(0).getData().contains(
- "(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
// run single line query in PigQueryInterpreter
String query = "foreach a generate name, age;";
@@ -118,13 +115,18 @@ public class PigQueryInterpreterTest {
assertEquals("group\tcol_1\nmale\t2\nfemale\t1\n", result.message().get(0).getData());
// syntax error in PigQueryInterpereter
- query = "b = group a by invalid_column;\nforeach b generate group as gender, " +
- "COUNT($1) as count;";
+ query =
+ "b = group a by invalid_column;\nforeach b generate group as gender, "
+ + "COUNT($1) as count;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertTrue(result.message().get(0).getData().contains(
- "Projected field [invalid_column] does not exist in schema"));
+ assertTrue(
+ result
+ .message()
+ .get(0)
+ .getData()
+ .contains("Projected field [invalid_column] does not exist in schema"));
// execution error in PigQueryInterpreter
query = "foreach a2 generate name, age;";
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 60af8bc..a9d3cc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,7 @@
<plugin.download.version>1.3.0</plugin.download.version>
<plugin.deploy.version>2.8.2</plugin.deploy.version>
<plugin.shade.version>3.1.1</plugin.shade.version>
+ <plugin.fmt.version>2.5.1</plugin.fmt.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
@@ -397,46 +398,6 @@
</plugin>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.17</version>
- <configuration>
- <skip>true</skip>
- <failOnViolation>false</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <sourceDirectories>${basedir}/src/main/java,${basedir}/src/main/scala</sourceDirectories>
- <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>
- <configLocation>_tools/checkstyle.xml</configLocation>
- <outputFile>${basedir}/target/checkstyle-output.xml</outputFile>
- <inputEncoding>${project.build.sourceEncoding}</inputEncoding>
- <outputEncoding>${project.reporting.outputEncoding}</outputEncoding>
- </configuration>
- <executions>
- <execution>
- <id>checkstyle-fail-build</id>
- <phase>validate</phase>
- <goals>
- <goal>check</goal>
- </goals>
- <configuration>
- <failOnViolation>true</failOnViolation>
- <excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/python/proto/*</excludes>
- </configuration>
- </execution>
- <execution>
- <id>checkstyle-gen-html-report</id>
- <phase>install</phase>
- <goals>
- <goal>checkstyle-aggregate</goal>
- </goals>
- <configuration>
- <excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/python/proto/*</excludes>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>${plugin.resource.version}</version>
<executions>
@@ -569,14 +530,29 @@
</configuration>
</plugin>
-->
+ <plugin>
+ <groupId>com.coveo</groupId>
+ <artifactId>fmt-maven-plugin</artifactId>
+ </plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>${plugin.checkstyle.version}</version>
+ <groupId>com.coveo</groupId>
+ <artifactId>fmt-maven-plugin</artifactId>
+ <version>${plugin.fmt.version}</version>
+ <configuration>
+ <displayLimit>10000</displayLimit>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/python/pom.xml
----------------------------------------------------------------------
diff --git a/python/pom.xml b/python/pom.xml
index d11c165..b60e423 100644
--- a/python/pom.xml
+++ b/python/pom.xml
@@ -175,14 +175,6 @@
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <skip>false</skip>
- </configuration>
- </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
index b9c897b..200fcf6 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
@@ -1,25 +1,30 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.zeppelin.python;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.proto.CancelRequest;
@@ -37,15 +42,7 @@ import org.apache.zeppelin.python.proto.StopRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.security.SecureRandom;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Grpc client for IPython kernel
- */
+/** Grpc client for IPython kernel */
public class IPythonClient {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonClient.class.getName());
@@ -56,16 +53,12 @@ public class IPythonClient {
private SecureRandom random = new SecureRandom();
- /**
- * Construct client for accessing RouteGuide server at {@code host:port}.
- */
+ /** Construct client for accessing RouteGuide server at {@code host:port}. */
public IPythonClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true));
}
- /**
- * Construct client for accessing RouteGuide server using the existing channel.
- */
+ /** Construct client for accessing RouteGuide server using the existing channel. */
public IPythonClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = IPythonGrpc.newBlockingStub(channel);
@@ -78,86 +71,91 @@ public class IPythonClient {
// execute the code and make the output as streaming by writing it to InterpreterOutputStream
// one by one.
- public ExecuteResponse stream_execute(ExecuteRequest request,
- final InterpreterOutputStream interpreterOutput) {
- final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
- .setStatus(ExecuteStatus.SUCCESS);
+ public ExecuteResponse stream_execute(
+ ExecuteRequest request, final InterpreterOutputStream interpreterOutput) {
+ final ExecuteResponse.Builder finalResponseBuilder =
+ ExecuteResponse.newBuilder().setStatus(ExecuteStatus.SUCCESS);
final AtomicBoolean completedFlag = new AtomicBoolean(false);
LOGGER.debug("stream_execute code:\n" + request.getCode());
- asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
- int index = 0;
- boolean isPreviousOutputImage = false;
-
- @Override
- public void onNext(ExecuteResponse executeResponse) {
- if (executeResponse.getType() == OutputType.TEXT) {
- try {
- LOGGER.debug("Interpreter Streaming Output: " + executeResponse.getOutput());
- if (isPreviousOutputImage) {
- // add '\n' when switch from image to text
- interpreterOutput.write("\n%text ".getBytes());
+ asyncStub.execute(
+ request,
+ new StreamObserver<ExecuteResponse>() {
+ int index = 0;
+ boolean isPreviousOutputImage = false;
+
+ @Override
+ public void onNext(ExecuteResponse executeResponse) {
+ if (executeResponse.getType() == OutputType.TEXT) {
+ try {
+ LOGGER.debug("Interpreter Streaming Output: " + executeResponse.getOutput());
+ if (isPreviousOutputImage) {
+ // add '\n' when switch from image to text
+ interpreterOutput.write("\n%text ".getBytes());
+ }
+ isPreviousOutputImage = false;
+ interpreterOutput.write(executeResponse.getOutput().getBytes());
+ interpreterOutput.getInterpreterOutput().flush();
+ } catch (IOException e) {
+ LOGGER.error("Unexpected IOException", e);
+ }
}
- isPreviousOutputImage = false;
- interpreterOutput.write(executeResponse.getOutput().getBytes());
- interpreterOutput.getInterpreterOutput().flush();
- } catch (IOException e) {
- LOGGER.error("Unexpected IOException", e);
- }
- }
- if (executeResponse.getType() == OutputType.IMAGE) {
- try {
- LOGGER.debug("Interpreter Streaming Output: IMAGE_DATA");
- if (index != 0) {
- // add '\n' if this is the not the first element. otherwise it would mix the image
- // with the text
- interpreterOutput.write("\n".getBytes());
+ if (executeResponse.getType() == OutputType.IMAGE) {
+ try {
+ LOGGER.debug("Interpreter Streaming Output: IMAGE_DATA");
+ if (index != 0) {
+ // add '\n' if this is the not the first element. otherwise it would mix the image
+ // with the text
+ interpreterOutput.write("\n".getBytes());
+ }
+ interpreterOutput.write(("%img " + executeResponse.getOutput()).getBytes());
+ interpreterOutput.getInterpreterOutput().flush();
+ isPreviousOutputImage = true;
+ } catch (IOException e) {
+ LOGGER.error("Unexpected IOException", e);
+ }
+ }
+ if (executeResponse.getStatus() == ExecuteStatus.ERROR) {
+ // set the finalResponse to ERROR if any ERROR happens, otherwise the finalResponse
+ // would
+ // be SUCCESS.
+ finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
}
- interpreterOutput.write(("%img " + executeResponse.getOutput()).getBytes());
- interpreterOutput.getInterpreterOutput().flush();
- isPreviousOutputImage = true;
- } catch (IOException e) {
- LOGGER.error("Unexpected IOException", e);
+ index++;
}
- }
- if (executeResponse.getStatus() == ExecuteStatus.ERROR) {
- // set the finalResponse to ERROR if any ERROR happens, otherwise the finalResponse would
- // be SUCCESS.
- finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
- }
- index++;
- }
- @Override
- public void onError(Throwable throwable) {
- try {
- interpreterOutput.getInterpreterOutput().write(ExceptionUtils.getStackTrace(throwable));
- interpreterOutput.getInterpreterOutput().flush();
- } catch (IOException e) {
- LOGGER.error("Unexpected IOException", e);
- }
- LOGGER.error("Fail to call IPython grpc", throwable);
- finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
+ @Override
+ public void onError(Throwable throwable) {
+ try {
+ interpreterOutput
+ .getInterpreterOutput()
+ .write(ExceptionUtils.getStackTrace(throwable));
+ interpreterOutput.getInterpreterOutput().flush();
+ } catch (IOException e) {
+ LOGGER.error("Unexpected IOException", e);
+ }
+ LOGGER.error("Fail to call IPython grpc", throwable);
+ finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
- completedFlag.set(true);
- synchronized (completedFlag) {
- completedFlag.notify();
- }
- }
+ completedFlag.set(true);
+ synchronized (completedFlag) {
+ completedFlag.notify();
+ }
+ }
- @Override
- public void onCompleted() {
- synchronized (completedFlag) {
- try {
- LOGGER.debug("stream_execute is completed");
- interpreterOutput.getInterpreterOutput().flush();
- } catch (IOException e) {
- LOGGER.error("Unexpected IOException", e);
+ @Override
+ public void onCompleted() {
+ synchronized (completedFlag) {
+ try {
+ LOGGER.debug("stream_execute is completed");
+ interpreterOutput.getInterpreterOutput().flush();
+ } catch (IOException e) {
+ LOGGER.error("Unexpected IOException", e);
+ }
+ completedFlag.set(true);
+ completedFlag.notify();
+ }
}
- completedFlag.set(true);
- completedFlag.notify();
- }
- }
- });
+ });
synchronized (completedFlag) {
if (!completedFlag.get()) {
@@ -204,14 +202,12 @@ public class IPythonClient {
asyncStub.stop(request, null);
}
-
public static void main(String[] args) {
IPythonClient client = new IPythonClient("localhost", 50053);
client.status(StatusRequest.newBuilder().build());
- ExecuteResponse response = client.block_execute(ExecuteRequest.newBuilder().
- setCode("abcd=2").build());
+ ExecuteResponse response =
+ client.block_execute(ExecuteRequest.newBuilder().setCode("abcd=2").build());
System.out.println(response.getOutput());
-
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 3c646ae..aebed5c 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -18,6 +18,16 @@
package org.apache.zeppelin.python;
import io.grpc.ManagedChannelBuilder;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
@@ -52,20 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * IPython Interpreter for Zeppelin
- */
+/** IPython Interpreter for Zeppelin */
public class IPythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
@@ -90,8 +87,8 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
/**
- * Sub class can customize the interpreter by adding more python packages under PYTHONPATH.
- * e.g. PySparkInterpreter
+ * Sub class can customize the interpreter by adding more python packages under PYTHONPATH. e.g.
+ * PySparkInterpreter
*
* @param additionalPythonPath
*/
@@ -101,8 +98,8 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
/**
- * Sub class can customize the interpreter by running additional python init code.
- * e.g. PySparkInterpreter
+ * Sub class can customize the interpreter by running additional python init code. e.g.
+ * PySparkInterpreter
*
* @param additionalPythonInitFile
*/
@@ -131,18 +128,22 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
LOGGER.info("Python Exec: " + pythonExecutable);
String checkPrerequisiteResult = checkIPythonPrerequisite(pythonExecutable);
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
- throw new InterpreterException("IPython prerequisite is not meet: " +
- checkPrerequisiteResult);
+ throw new InterpreterException(
+ "IPython prerequisite is not meet: " + checkPrerequisiteResult);
}
- ipythonLaunchTimeout = Long.parseLong(
- getProperty("zeppelin.ipython.launch.timeout", "30000"));
+ ipythonLaunchTimeout =
+ Long.parseLong(getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = buildZeppelinContext();
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
- int message_size = Integer.parseInt(getProperty("zeppelin.ipython.grpc.message_size",
- 32 * 1024 * 1024 + ""));
- ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort)
- .usePlaintext(true).maxInboundMessageSize(message_size));
+ int message_size =
+ Integer.parseInt(
+ getProperty("zeppelin.ipython.grpc.message_size", 32 * 1024 * 1024 + ""));
+ ipythonClient =
+ new IPythonClient(
+ ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort)
+ .usePlaintext(true)
+ .maxInboundMessageSize(message_size));
this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
this.secret = PythonUtils.createSecret(256);
launchIPythonKernel(ipythonPort);
@@ -153,8 +154,8 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
/**
- * non-empty return value mean the errors when checking ipython prerequisite.
- * empty value mean IPython prerequisite is meet.
+ * non-empty return value mean the errors when checking ipython prerequisite. empty value mean
+ * IPython prerequisite is meet.
*
* @param pythonExec
* @return
@@ -170,8 +171,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
Process proc = processBuilder.start();
int ret = proc.waitFor();
if (ret != 0) {
- return "Fail to run pip freeze.\n" +
- IOUtils.toString(new FileInputStream(stderrFile));
+ return "Fail to run pip freeze.\n" + IOUtils.toString(new FileInputStream(stderrFile));
}
String freezeOutput = IOUtils.toString(new FileInputStream(stdoutFile));
if (!freezeOutput.contains("jupyter-client=")) {
@@ -206,26 +206,34 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
InputStream input =
getClass().getClassLoader().getResourceAsStream("grpc/python/zeppelin_python.py");
List<String> lines = IOUtils.readLines(input);
- ExecuteResponse response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
- .setCode(StringUtils.join(lines, System.lineSeparator())
- .replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
- .replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build());
+ ExecuteResponse response =
+ ipythonClient.block_execute(
+ ExecuteRequest.newBuilder()
+ .setCode(
+ StringUtils.join(lines, System.lineSeparator())
+ .replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
+ .replace("${JVM_GATEWAY_ADDRESS}", serverAddress))
+ .build());
if (response.getStatus() == ExecuteStatus.ERROR) {
throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
}
- input =
- getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
+ input = getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
lines = IOUtils.readLines(input);
- response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
- .setCode(StringUtils.join(lines, System.lineSeparator())).build());
+ response =
+ ipythonClient.block_execute(
+ ExecuteRequest.newBuilder()
+ .setCode(StringUtils.join(lines, System.lineSeparator()))
+ .build());
if (response.getStatus() == ExecuteStatus.ERROR) {
throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput());
}
- response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
- .setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
- .build());
+ response =
+ ipythonClient.block_execute(
+ ExecuteRequest.newBuilder()
+ .setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
+ .build());
if (response.getStatus() == ExecuteStatus.ERROR) {
throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput());
}
@@ -233,27 +241,31 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
if (additionalPythonInitFile != null) {
input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile);
lines = IOUtils.readLines(input);
- response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
- .setCode(StringUtils.join(lines, System.lineSeparator())
- .replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
- .replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build());
+ response =
+ ipythonClient.block_execute(
+ ExecuteRequest.newBuilder()
+ .setCode(
+ StringUtils.join(lines, System.lineSeparator())
+ .replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
+ .replace("${JVM_GATEWAY_ADDRESS}", serverAddress))
+ .build());
if (response.getStatus() == ExecuteStatus.ERROR) {
- throw new IOException("Fail to run additional Python init file: "
- + additionalPythonInitFile + "\n" + response.getOutput());
+ throw new IOException(
+ "Fail to run additional Python init file: "
+ + additionalPythonInitFile
+ + "\n"
+ + response.getOutput());
}
}
}
-
- private void launchIPythonKernel(int ipythonPort)
- throws IOException {
+ private void launchIPythonKernel(int ipythonPort) throws IOException {
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
// copy the python scripts to a temp directory, then launch ipython kernel in that folder
File pythonWorkDir = Files.createTempDirectory("zeppelin_ipython").toFile();
String[] ipythonScripts = {"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"};
for (String ipythonScript : ipythonScripts) {
- URL url = getClass().getClassLoader().getResource("grpc/python"
- + "/" + ipythonScript);
+ URL url = getClass().getClassLoader().getResource("grpc/python" + "/" + ipythonScript);
FileUtils.copyURLToFile(url, new File(pythonWorkDir, ipythonScript));
}
@@ -267,10 +279,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
executor.setWatchdog(watchDog);
if (useBuiltinPy4j) {
- //TODO(zjffdu) don't do hard code on py4j here
+ // TODO(zjffdu) don't do hard code on py4j here
File py4jDestFile = new File(pythonWorkDir, "py4j-src-0.10.7.zip");
- FileUtils.copyURLToFile(getClass().getClassLoader().getResource(
- "python/py4j-src-0.10.7.zip"), py4jDestFile);
+ FileUtils.copyURLToFile(
+ getClass().getClassLoader().getResource("python/py4j-src-0.10.7.zip"), py4jDestFile);
if (additionalPythonPath != null) {
// put the py4j at the end, because additionalPythonPath may already contain py4j.
// e.g. PySparkInterpreter
@@ -306,8 +318,8 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
if ((System.currentTimeMillis() - startTime) > ipythonLaunchTimeout) {
- throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
- + " seconds");
+ throw new IOException(
+ "Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000 + " seconds");
}
}
}
@@ -345,15 +357,15 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
zeppelinContext.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
ExecuteResponse response =
- ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
- interpreterOutput);
+ ipythonClient.stream_execute(
+ ExecuteRequest.newBuilder().setCode(st).build(), interpreterOutput);
try {
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
throw new RuntimeException("Fail to write output", e);
}
- InterpreterResult result = new InterpreterResult(
- InterpreterResult.Code.valueOf(response.getStatus().name()));
+ InterpreterResult result =
+ new InterpreterResult(InterpreterResult.Code.valueOf(response.getStatus().name()));
return result;
}
@@ -373,14 +385,17 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
@Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
+ public List<InterpreterCompletion> completion(
+ String buf, int cursor, InterpreterContext interpreterContext) {
LOGGER.debug("Call completion for: " + buf);
List<InterpreterCompletion> completions = new ArrayList<>();
CompletionResponse response =
ipythonClient.complete(
- CompletionRequest.getDefaultInstance().newBuilder().setCode(buf)
- .setCursor(cursor).build());
+ CompletionRequest.getDefaultInstance()
+ .newBuilder()
+ .setCode(buf)
+ .setCursor(cursor)
+ .build());
for (int i = 0; i < response.getMatchesCount(); i++) {
String match = response.getMatches(i);
int lastIndexOfDot = match.lastIndexOf(".");