You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2021/05/18 22:17:33 UTC

[incubator-wayang] 02/04: [WAYANG-30] Flink cluster submitting plan testing

This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch debugger-sidecar
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 7f0ea3be052aefb4ba059d7b72776da190f32f18
Author: rodrigopardomeza <ro...@gmail.com>
AuthorDate: Tue May 18 18:15:19 2021 -0400

    [WAYANG-30] Flink cluster submitting plan testing
---
 .../org/apache/wayang/hackit/sidecar/test.java     | 225 +++++++++++++++++++++
 1 file changed, 225 insertions(+)

diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/test.java b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/test.java
new file mode 100644
index 0000000..816384a
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/test.java
@@ -0,0 +1,225 @@
+package org.apache.wayang.hackit.sidecar;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Collector;
+import org.reflections.Reflections;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.function.Function;
+import java.util.jar.JarOutputStream;
+import java.util.jar.JarEntry;
+import java.util.zip.ZipEntry;
+
+public class test {
+
+    public static void main(String... args) throws Exception {
+        System.out.println(Arrays.toString(getJars()));
+        ExecutionEnvironment env = ExecutionEnvironment
+                .createRemoteEnvironment(
+                        "127.0.0.1",
+                        8081,
+                        //"/Users/rodrigopardomeza/tu-berlin/debugger/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/target/wayang-hackit-sidecar-0.6.0-SNAPSHOT.jar"
+                        getJars()
+                );
+
+        String clusterPath = "/mnt/example/";
+        //DataSet<String> data = env.readTextFile("/Users/rodrigopardomeza/flink/count");
+        //DataSet<String> data = env.readTextFile(clusterPath + "count");
+        DataSet<String> data = env.readTextFile("/mnt/example/count");
+
+        System.out.println(data.count());
+
+        DataSet<String> words = data.flatMap(
+                /*(FlatMapFunction<String, String>) (s,l) -> {
+                    for (String ss : s.split(" ")) {
+                        l.collect(ss);
+                    }
+                }*/
+                new FlatMapFunction<String, String>() {
+                    @Override
+                    public void flatMap(String s, Collector<String> collector) throws Exception {
+                        String[] wor = s.split(" ");
+                        for (String ss: wor
+                        ) {
+                            collector.collect(ss);
+                        }
+                    }
+                }
+
+        );
+        Function<Integer, Integer> lala = a -> a + 2;
+        DataSet<String> data2 = words
+                .filter((FilterFunction<String>) value -> value.startsWith("T"));
+        /*DataSet<String> words = data.flatMap(
+                new FlatMapFunction<String, String>() {
+                    @Override
+                    public void flatMap(String s, Collector<String> collector) throws Exception {
+                        String[] wor = s.split(" ");
+                        for (String ss: wor
+                             ) {
+                            collector.collect(ss);
+                        }
+                    }
+                }
+
+        );*/
+                        //.filter(line -> line.startsWith("T"))
+        data2.writeAsText( "/mnt/example/cluster_4", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
+                //.writeAsText("/Users/rodrigopardomeza/flink/cluster_babe");
+
+        env.execute();
+
+    }
+
+    private static String[] getJars(){
+        List<String> jars = new ArrayList<>(5);
+        List<Class> clazzs = Arrays.asList(new Class[]{test.class});
+
+        clazzs.stream().map(
+                test::getDeclaringJar
+        ).filter(
+                element -> element != null
+        ).forEach(jars::add);
+
+        return jars.toArray(new String[0]);
+    }
+
+    public static String getDeclaringJar(Class<?> cls) {
+        try {
+            final URL location = cls.getProtectionDomain().getCodeSource().getLocation();
+            final URI uri = location.toURI();
+            String path = uri.getPath();
+            if (path.endsWith(".jar")) {
+                return path;
+            } else {
+                System.out.println(
+                    String.format(
+                        "Class %s is not loaded from a JAR file, but from %s. Thus, cannot provide the JAR file.",
+                            cls,
+                            path
+                    )
+                );
+                path = createJAR(cls);
+                return path;
+            }
+        } catch (Exception e) {
+            System.out.println(
+                String.format(
+                        "Could not determine JAR file declaring %s.",
+                        cls
+                )
+            );
+        }
+        return null;
+    }
+
+    public static String createJAR(Class clazz){
+        try {
+            Path tmpCustomPrefix = Files.createTempDirectory("lala");
+            String path_final = tmpCustomPrefix.toString() + "/lala.jar";
+            //String path_final = "/Users/rodrigopardomeza/tu-berlin/debugger/lala.jar";
+            System.out.println(path_final);
+
+            FileOutputStream fout = new FileOutputStream(path_final);
+            JarOutputStream jarOut = new JarOutputStream(fout);
+
+
+            Class[] clazzs = getClasses(clazz.getPackage().getName());
+
+            for(int i = 0; i < clazzs.length; i++){
+                addClass(clazzs[i], jarOut);
+            }
+
+
+
+            jarOut.close();
+            fout.close();
+            return path_final;
+
+        } catch (Exception e){
+
+        }
+        return null;
+    }
+
+    private static void addClass(Class c, JarOutputStream jarOutputStream) throws IOException {
+        String path = c.getName().replace('.', '/') + ".class";
+        jarOutputStream.putNextEntry(new JarEntry(path));
+        jarOutputStream.write(toByteArray(c.getClassLoader().getResourceAsStream(path)));
+        jarOutputStream.closeEntry();
+    }
+
+    public static byte[] toByteArray(InputStream in) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        byte[] buf = new byte[0x1000];
+        while (true) {
+            int r = in.read(buf);
+            if (r == -1) {
+                break;
+            }
+            out.write(buf, 0, r);
+        }
+        return out.toByteArray();
+    }
+
+    /**
+     * Scans all classes accessible from the context class loader which belong to the given package and subpackages.
+     *
+     * @param packageName The base package
+     * @return The classes
+     * @throws ClassNotFoundException
+     * @throws IOException
+     */
+    private static Class[] getClasses(String packageName)
+            throws ClassNotFoundException, IOException {
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        assert classLoader != null;
+        String path = packageName.replace('.', '/');
+        Enumeration<URL> resources = classLoader.getResources(path);
+        List<File> dirs = new ArrayList<File>();
+        while (resources.hasMoreElements()) {
+            URL resource = resources.nextElement();
+            dirs.add(new File(resource.getFile()));
+        }
+        ArrayList<Class> classes = new ArrayList<Class>();
+        for (File directory : dirs) {
+            classes.addAll(findClasses(directory, packageName));
+        }
+        return classes.toArray(new Class[classes.size()]);
+    }
+
+    /**
+     * Recursive method used to find all classes in a given directory and subdirs.
+     *
+     * @param directory   The base directory
+     * @param packageName The package name for classes found inside the base directory
+     * @return The classes
+     * @throws ClassNotFoundException
+     */
+    private static List<Class> findClasses(File directory, String packageName) throws ClassNotFoundException {
+        List<Class> classes = new ArrayList<Class>();
+        if (!directory.exists()) {
+            return classes;
+        }
+        File[] files = directory.listFiles();
+        for (File file : files) {
+            //if (file.isDirectory()) {
+            //    assert !file.getName().contains(".");
+            //    classes.addAll(findClasses(file, packageName + "." + file.getName()));
+            //} else
+            if (file.getName().endsWith(".class")) {
+                classes.add(Class.forName(packageName + '.' + file.getName().substring(0, file.getName().length() - 6)));
+            }
+        }
+        return classes;
+    }
+}
+