You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2021/05/18 22:17:33 UTC
[incubator-wayang] 02/04: [WAYANG-30] Flink cluster submitting plan
testing
This is an automated email from the ASF dual-hosted git repository.
rpardomeza pushed a commit to branch debugger-sidecar
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 7f0ea3be052aefb4ba059d7b72776da190f32f18
Author: rodrigopardomeza <ro...@gmail.com>
AuthorDate: Tue May 18 18:15:19 2021 -0400
[WAYANG-30] Flink cluster submitting plan testing
---
.../org/apache/wayang/hackit/sidecar/test.java | 225 +++++++++++++++++++++
1 file changed, 225 insertions(+)
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/test.java b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/test.java
new file mode 100644
index 0000000..816384a
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/test.java
@@ -0,0 +1,225 @@
+package org.apache.wayang.hackit.sidecar;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Collector;
+import org.reflections.Reflections;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.function.Function;
+import java.util.jar.JarOutputStream;
+import java.util.jar.JarEntry;
+import java.util.zip.ZipEntry;
+
+public class test {
+
+ public static void main(String... args) throws Exception {
+ System.out.println(Arrays.toString(getJars()));
+ ExecutionEnvironment env = ExecutionEnvironment
+ .createRemoteEnvironment(
+ "127.0.0.1",
+ 8081,
+ //"/Users/rodrigopardomeza/tu-berlin/debugger/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/target/wayang-hackit-sidecar-0.6.0-SNAPSHOT.jar"
+ getJars()
+ );
+
+ String clusterPath = "/mnt/example/";
+ //DataSet<String> data = env.readTextFile("/Users/rodrigopardomeza/flink/count");
+ //DataSet<String> data = env.readTextFile(clusterPath + "count");
+ DataSet<String> data = env.readTextFile("/mnt/example/count");
+
+ System.out.println(data.count());
+
+ DataSet<String> words = data.flatMap(
+ /*(FlatMapFunction<String, String>) (s,l) -> {
+ for (String ss : s.split(" ")) {
+ l.collect(ss);
+ }
+ }*/
+ new FlatMapFunction<String, String>() {
+ @Override
+ public void flatMap(String s, Collector<String> collector) throws Exception {
+ String[] wor = s.split(" ");
+ for (String ss: wor
+ ) {
+ collector.collect(ss);
+ }
+ }
+ }
+
+ );
+ Function<Integer, Integer> lala = a -> a + 2;
+ DataSet<String> data2 = words
+ .filter((FilterFunction<String>) value -> value.startsWith("T"));
+ /*DataSet<String> words = data.flatMap(
+ new FlatMapFunction<String, String>() {
+ @Override
+ public void flatMap(String s, Collector<String> collector) throws Exception {
+ String[] wor = s.split(" ");
+ for (String ss: wor
+ ) {
+ collector.collect(ss);
+ }
+ }
+ }
+
+ );*/
+ //.filter(line -> line.startsWith("T"))
+ data2.writeAsText( "/mnt/example/cluster_4", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
+ //.writeAsText("/Users/rodrigopardomeza/flink/cluster_babe");
+
+ env.execute();
+
+ }
+
+ private static String[] getJars(){
+ List<String> jars = new ArrayList<>(5);
+ List<Class> clazzs = Arrays.asList(new Class[]{test.class});
+
+ clazzs.stream().map(
+ test::getDeclaringJar
+ ).filter(
+ element -> element != null
+ ).forEach(jars::add);
+
+ return jars.toArray(new String[0]);
+ }
+
+ public static String getDeclaringJar(Class<?> cls) {
+ try {
+ final URL location = cls.getProtectionDomain().getCodeSource().getLocation();
+ final URI uri = location.toURI();
+ String path = uri.getPath();
+ if (path.endsWith(".jar")) {
+ return path;
+ } else {
+ System.out.println(
+ String.format(
+ "Class %s is not loaded from a JAR file, but from %s. Thus, cannot provide the JAR file.",
+ cls,
+ path
+ )
+ );
+ path = createJAR(cls);
+ return path;
+ }
+ } catch (Exception e) {
+ System.out.println(
+ String.format(
+ "Could not determine JAR file declaring %s.",
+ cls
+ )
+ );
+ }
+ return null;
+ }
+
+ public static String createJAR(Class clazz){
+ try {
+ Path tmpCustomPrefix = Files.createTempDirectory("lala");
+ String path_final = tmpCustomPrefix.toString() + "/lala.jar";
+ //String path_final = "/Users/rodrigopardomeza/tu-berlin/debugger/lala.jar";
+ System.out.println(path_final);
+
+ FileOutputStream fout = new FileOutputStream(path_final);
+ JarOutputStream jarOut = new JarOutputStream(fout);
+
+
+ Class[] clazzs = getClasses(clazz.getPackage().getName());
+
+ for(int i = 0; i < clazzs.length; i++){
+ addClass(clazzs[i], jarOut);
+ }
+
+
+
+ jarOut.close();
+ fout.close();
+ return path_final;
+
+ } catch (Exception e){
+
+ }
+ return null;
+ }
+
+ private static void addClass(Class c, JarOutputStream jarOutputStream) throws IOException {
+ String path = c.getName().replace('.', '/') + ".class";
+ jarOutputStream.putNextEntry(new JarEntry(path));
+ jarOutputStream.write(toByteArray(c.getClassLoader().getResourceAsStream(path)));
+ jarOutputStream.closeEntry();
+ }
+
+ public static byte[] toByteArray(InputStream in) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] buf = new byte[0x1000];
+ while (true) {
+ int r = in.read(buf);
+ if (r == -1) {
+ break;
+ }
+ out.write(buf, 0, r);
+ }
+ return out.toByteArray();
+ }
+
+ /**
+ * Scans all classes accessible from the context class loader which belong to the given package and subpackages.
+ *
+ * @param packageName The base package
+ * @return The classes
+ * @throws ClassNotFoundException
+ * @throws IOException
+ */
+ private static Class[] getClasses(String packageName)
+ throws ClassNotFoundException, IOException {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ assert classLoader != null;
+ String path = packageName.replace('.', '/');
+ Enumeration<URL> resources = classLoader.getResources(path);
+ List<File> dirs = new ArrayList<File>();
+ while (resources.hasMoreElements()) {
+ URL resource = resources.nextElement();
+ dirs.add(new File(resource.getFile()));
+ }
+ ArrayList<Class> classes = new ArrayList<Class>();
+ for (File directory : dirs) {
+ classes.addAll(findClasses(directory, packageName));
+ }
+ return classes.toArray(new Class[classes.size()]);
+ }
+
+ /**
+ * Recursive method used to find all classes in a given directory and subdirs.
+ *
+ * @param directory The base directory
+ * @param packageName The package name for classes found inside the base directory
+ * @return The classes
+ * @throws ClassNotFoundException
+ */
+ private static List<Class> findClasses(File directory, String packageName) throws ClassNotFoundException {
+ List<Class> classes = new ArrayList<Class>();
+ if (!directory.exists()) {
+ return classes;
+ }
+ File[] files = directory.listFiles();
+ for (File file : files) {
+ //if (file.isDirectory()) {
+ // assert !file.getName().contains(".");
+ // classes.addAll(findClasses(file, packageName + "." + file.getName()));
+ //} else
+ if (file.getName().endsWith(".class")) {
+ classes.add(Class.forName(packageName + '.' + file.getName().substring(0, file.getName().length() - 6)));
+ }
+ }
+ return classes;
+ }
+}
+