You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:37 UTC
[58/73] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java
deleted file mode 100644
index 6d4c7b5..0000000
--- a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.jar.JarFile;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-
-/**
- * All classes in this package contain code taken from
- * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
- * and
- * https://github.com/hortonworks/simple-yarn-app
- * and
- * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- *
- * The Flink jar is uploaded to HDFS by this client.
- * The application master and all the TaskManager containers get the jar file downloaded
- * by YARN into their local fs.
- *
- */
-public class Client {
- private static final Log LOG = LogFactory.getLog(Client.class);
-
- /**
- * Command Line argument options
- */
- private static final Option QUERY = new Option("q","query",false, "Display available YARN resources (memory, cores)");
- // --- or ---
- private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
- private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
- private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
- private static final Option SHIP_PATH = new Option("s","ship",true, "Ship files in the specified directory");
- private static final Option FLINK_CONF_DIR = new Option("c","confDir",true, "Path to Flink configuration directory");
- private static final Option FLINK_JAR = new Option("j","jar",true, "Path to Flink jar file");
- private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
- private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
- private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
- private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
- + " TaskTrackers)");
-
- /**
- * Constants
- */
- // environment variable names
- public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
- public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
- public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
- public final static String ENV_APP_ID = "_APP_ID";
- public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
- public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
- public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
- public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-
- private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-
-
-
- private Configuration conf;
-
- public void run(String[] args) throws Exception {
-
- if(UserGroupInformation.isSecurityEnabled()) {
- throw new RuntimeException("Flink YARN client does not have security support right now."
- + "File a bug, we will fix it asap");
- }
- //Utils.logFilesInCurrentDirectory(LOG);
- //
- // Command Line Options
- //
- Options options = new Options();
- options.addOption(VERBOSE);
- options.addOption(FLINK_CONF_DIR);
- options.addOption(FLINK_JAR);
- options.addOption(JM_MEMORY);
- options.addOption(TM_MEMORY);
- options.addOption(TM_CORES);
- options.addOption(CONTAINER);
- options.addOption(GEN_CONF);
- options.addOption(QUEUE);
- options.addOption(QUERY);
- options.addOption(SHIP_PATH);
-
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = null;
- try {
- cmd = parser.parse( options, args);
- } catch(MissingOptionException moe) {
- System.out.println(moe.getMessage());
- printUsage();
- System.exit(1);
- }
-
- if (System.getProperty("log4j.configuration") == null) {
- Logger root = Logger.getRootLogger();
- root.removeAllAppenders();
- PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
- ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
- root.addAppender(appender);
- if(cmd.hasOption(VERBOSE.getOpt())) {
- root.setLevel(Level.DEBUG);
- LOG.debug("CLASSPATH: "+System.getProperty("java.class.path"));
- } else {
- root.setLevel(Level.INFO);
- }
- }
-
-
- // Jar Path
- Path localJarPath;
- if(cmd.hasOption(FLINK_JAR.getOpt())) {
- String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
- if(!userPath.startsWith("file://")) {
- userPath = "file://" + userPath;
- }
- localJarPath = new Path(userPath);
- } else {
- localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
- }
-
- if(cmd.hasOption(GEN_CONF.getOpt())) {
- LOG.info("Placing default configuration in current directory");
- File outFile = generateDefaultConf(localJarPath);
- LOG.info("File written to "+outFile.getAbsolutePath());
- System.exit(0);
- }
-
- // Conf Path
- Path confPath = null;
- String confDirPath = "";
- if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
- confDirPath = cmd.getOptionValue(FLINK_CONF_DIR.getOpt())+"/";
- File confFile = new File(confDirPath+CONFIG_FILE_NAME);
- if(!confFile.exists()) {
- LOG.fatal("Unable to locate configuration file in "+confFile);
- System.exit(1);
- }
- confPath = new Path(confFile.getAbsolutePath());
- } else {
- System.out.println("No configuration file has been specified");
-
- // no configuration path given.
- // -> see if there is one in the current directory
- File currDir = new File(".");
- File[] candidates = currDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(final File dir, final String name) {
- return name != null && name.endsWith(".yaml");
- }
- });
- if(candidates == null || candidates.length == 0) {
- System.out.println("No configuration file has been found in current directory.\n"
- + "Copying default.");
- File outFile = generateDefaultConf(localJarPath);
- confPath = new Path(outFile.toURI());
- } else {
- if(candidates.length > 1) {
- System.out.println("Multiple .yaml configuration files were found in the current directory\n"
- + "Please specify one explicitly");
- System.exit(1);
- } else if(candidates.length == 1) {
- confPath = new Path(candidates[0].toURI());
- }
- }
- }
- List<File> shipFiles = new ArrayList<File>();
- // path to directory to ship
- if(cmd.hasOption(SHIP_PATH.getOpt())) {
- String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
- File shipDir = new File(shipPath);
- if(shipDir.isDirectory()) {
- shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return !(name.equals(".") || name.equals("..") );
- }
- })));
- } else {
- LOG.warn("Ship directory is not a directory!");
- }
- }
- boolean hasLog4j = false;
- //check if there is a log4j file
- if(confDirPath.length() > 0) {
- File l4j = new File(confDirPath+"/log4j.properties");
- if(l4j.exists()) {
- shipFiles.add(l4j);
- hasLog4j = true;
- }
- }
-
- // queue
- String queue = "default";
- if(cmd.hasOption(QUEUE.getOpt())) {
- queue = cmd.getOptionValue(QUEUE.getOpt());
- }
-
- // JobManager Memory
- int jmMemory = 512;
- if(cmd.hasOption(JM_MEMORY.getOpt())) {
- jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
- }
-
- // Task Managers memory
- int tmMemory = 1024;
- if(cmd.hasOption(TM_MEMORY.getOpt())) {
- tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
- }
-
- // Task Managers vcores
- int tmCores = 1;
- if(cmd.hasOption(TM_CORES.getOpt())) {
- tmCores = Integer.valueOf(cmd.getOptionValue(TM_CORES.getOpt()));
- }
- Utils.getFlinkConfiguration(confPath.toUri().getPath());
- int jmPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0);
- if(jmPort == 0) {
- LOG.warn("Unable to find job manager port in configuration!");
- jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
- }
- conf = Utils.initializeYarnConfiguration();
-
- // intialize HDFS
- LOG.info("Copy App Master jar from local filesystem and add to local environment");
- // Copy the application master jar to the filesystem
- // Create a local resource to point to the destination jar path
- final FileSystem fs = FileSystem.get(conf);
-
- if(fs.getScheme().startsWith("file")) {
- LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
- + "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
- + "The Flink YARN client needs to store its files in a distributed file system");
- }
-
- // Create yarnClient
- final YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
-
- // Query cluster for metrics
- if(cmd.hasOption(QUERY.getOpt())) {
- showClusterMetrics(yarnClient);
- }
- if(!cmd.hasOption(CONTAINER.getOpt())) {
- LOG.fatal("Missing required argument "+CONTAINER.getOpt());
- printUsage();
- yarnClient.stop();
- System.exit(1);
- }
-
- // TM Count
- final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
-
- System.out.println("Using values:");
- System.out.println("\tContainer Count = "+taskManagerCount);
- System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
- System.out.println("\tConfiguration file = "+confPath.toUri().getPath());
- System.out.println("\tJobManager memory = "+jmMemory);
- System.out.println("\tTaskManager memory = "+tmMemory);
- System.out.println("\tTaskManager cores = "+tmCores);
-
- // Create application via yarnClient
- YarnClientApplication app = yarnClient.createApplication();
- GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
- Resource maxRes = appResponse.getMaximumResourceCapability();
- if(tmMemory > maxRes.getMemory() || tmCores > maxRes.getVirtualCores()) {
- LOG.fatal("The cluster does not have the requested resources for the TaskManagers available!\n"
- + "Maximum Memory: "+maxRes.getMemory() +", Maximum Cores: "+tmCores);
- yarnClient.stop();
- System.exit(1);
- }
- if(jmMemory > maxRes.getMemory() ) {
- LOG.fatal("The cluster does not have the requested resources for the JobManager available!\n"
- + "Maximum Memory: "+maxRes.getMemory());
- yarnClient.stop();
- System.exit(1);
- }
- int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
- ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
- if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
- LOG.fatal("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
- + "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
- yarnClient.stop();
- System.exit(1);
- }
- if( tmMemory > freeClusterMem.containerLimit) {
- LOG.fatal("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
- + "the largest possible YARN container: "+freeClusterMem.containerLimit);
- yarnClient.stop();
- System.exit(1);
- }
- if( jmMemory > freeClusterMem.containerLimit) {
- LOG.fatal("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
- + "the largest possible YARN container: "+freeClusterMem.containerLimit);
- yarnClient.stop();
- System.exit(1);
- }
-
- // respect custom JVM options in the YAML file
- final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
-
- String amCommand = "$JAVA_HOME/bin/java"
- + " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
- if(hasLog4j) {
- amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
- }
- amCommand += " org.apache.flink.yarn.ApplicationMaster" + " "
- + " 1>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
- + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
- amContainer.setCommands(Collections.singletonList(amCommand));
-
- System.err.println("amCommand="+amCommand);
-
- // Set-up ApplicationSubmissionContext for the application
- ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
- final ApplicationId appId = appContext.getApplicationId();
-
- // Setup jar for ApplicationMaster
- LocalResource appMasterJar = Records.newRecord(LocalResource.class);
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
- Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar, fs.getHomeDirectory());
- Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), confPath, flinkConf, fs.getHomeDirectory());
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
- localResources.put("flink.jar", appMasterJar);
- localResources.put("flink-conf.yaml", flinkConf);
-
-
- // setup security tokens (code from apache storm)
- final Path[] paths = new Path[3 + shipFiles.size()];
- StringBuffer envShipFileList = new StringBuffer();
- // upload ship files
- for (int i = 0; i < shipFiles.size(); i++) {
- File shipFile = shipFiles.get(i);
- LocalResource shipResources = Records.newRecord(LocalResource.class);
- Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
- paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
- shipLocalPath, shipResources, fs.getHomeDirectory());
- localResources.put(shipFile.getName(), shipResources);
-
- envShipFileList.append(paths[3 + i]);
- if(i+1 < shipFiles.size()) {
- envShipFileList.append(',');
- }
- }
-
- paths[0] = remotePathJar;
- paths[1] = remotePathConf;
- paths[2] = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
- FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
- fs.setPermission(paths[2], permission); // set permission for path.
- Utils.setTokensFor(amContainer, paths, this.conf);
-
-
- amContainer.setLocalResources(localResources);
- fs.close();
-
- // Setup CLASSPATH for ApplicationMaster
- Map<String, String> appMasterEnv = new HashMap<String, String>();
- Utils.setupEnv(conf, appMasterEnv);
- // set configuration values
- appMasterEnv.put(Client.ENV_TM_COUNT, String.valueOf(taskManagerCount));
- appMasterEnv.put(Client.ENV_TM_CORES, String.valueOf(tmCores));
- appMasterEnv.put(Client.ENV_TM_MEMORY, String.valueOf(tmMemory));
- appMasterEnv.put(Client.FLINK_JAR_PATH, remotePathJar.toString() );
- appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
- appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
- appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
- appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-
- amContainer.setEnvironment(appMasterEnv);
-
- // Set up resource type requirements for ApplicationMaster
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(jmMemory);
- capability.setVirtualCores(1);
-
- appContext.setApplicationName("Flink"); // application name
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(capability);
- appContext.setQueue(queue);
-
- // file that we write into the conf/ dir containing the jobManager address.
- final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- LOG.info("Killing the Flink-YARN application.");
- yarnClient.killApplication(appId);
- LOG.info("Deleting files in "+paths[2]);
- FileSystem shutFS = FileSystem.get(conf);
- shutFS.delete(paths[2], true); // delete conf and jar file.
- shutFS.close();
- } catch (Exception e) {
- LOG.warn("Exception while killing the YARN application", e);
- }
- try {
- addrFile.delete();
- } catch (Exception e) {
- LOG.warn("Exception while deleting the jobmanager address file", e);
- }
- LOG.info("YARN Client is shutting down");
- yarnClient.stop();
- }
- });
-
- LOG.info("Submitting application master " + appId);
- yarnClient.submitApplication(appContext);
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- YarnApplicationState appState = appReport.getYarnApplicationState();
- boolean told = false;
- char[] el = { '/', '|', '\\', '-'};
- int i = 0;
- while (appState != YarnApplicationState.FINISHED
- && appState != YarnApplicationState.KILLED
- && appState != YarnApplicationState.FAILED) {
- if(!told && appState == YarnApplicationState.RUNNING) {
- System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
- System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
- // write jobmanager connect information
-
- PrintWriter out = new PrintWriter(addrFile);
- out.println(appReport.getHost()+":"+jmPort);
- out.close();
- addrFile.setReadable(true, false); // readable for all.
- told = true;
- }
- if(!told) {
- System.err.print(el[i++]+"\r");
- if(i == el.length) {
- i = 0;
- }
- Thread.sleep(500); // wait for the application to switch to RUNNING
- } else {
- Thread.sleep(5000);
- }
-
- appReport = yarnClient.getApplicationReport(appId);
- appState = appReport.getYarnApplicationState();
- }
-
- LOG.info("Application " + appId + " finished with"
- + " state " + appState + " at " + appReport.getFinishTime());
- if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
- LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
- }
-
- }
- private static class ClusterResourceDescription {
- public int totalFreeMemory;
- public int containerLimit;
- }
- private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
- ClusterResourceDescription crd = new ClusterResourceDescription();
- crd.totalFreeMemory = 0;
- crd.containerLimit = 0;
- List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
- for(NodeReport rep : nodes) {
- int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
- crd.totalFreeMemory += free;
- if(free > crd.containerLimit) {
- crd.containerLimit = free;
- }
- }
- return crd;
- }
-
- private void printUsage() {
- System.out.println("Usage:");
- HelpFormatter formatter = new HelpFormatter();
- formatter.setWidth(200);
- formatter.setLeftPadding(5);
- formatter.setSyntaxPrefix(" Required");
- Options req = new Options();
- req.addOption(CONTAINER);
- formatter.printHelp(" ", req);
-
- formatter.setSyntaxPrefix(" Optional");
- Options opt = new Options();
- opt.addOption(VERBOSE);
- // opt.addOption(GEN_CONF);
- // opt.addOption(STRATOSPHERE_CONF);
- // opt.addOption(STRATOSPHERE_JAR);
- opt.addOption(JM_MEMORY);
- opt.addOption(TM_MEMORY);
- opt.addOption(TM_CORES);
- opt.addOption(QUERY);
- opt.addOption(QUEUE);
- formatter.printHelp(" ", opt);
- }
-
- private void showClusterMetrics(YarnClient yarnClient)
- throws YarnException, IOException {
- YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
- System.out.println("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
- List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
- final String format = "|%-16s |%-16s %n";
- System.out.printf("|Property |Value %n");
- System.out.println("+---------------------------------------+");
- int totalMemory = 0;
- int totalCores = 0;
- for(NodeReport rep : nodes) {
- final Resource res = rep.getCapability();
- totalMemory += res.getMemory();
- totalCores += res.getVirtualCores();
- System.out.format(format, "NodeID", rep.getNodeId());
- System.out.format(format, "Memory", res.getMemory()+" MB");
- System.out.format(format, "vCores", res.getVirtualCores());
- System.out.format(format, "HealthReport", rep.getHealthReport());
- System.out.format(format, "Containers", rep.getNumContainers());
- System.out.println("+---------------------------------------+");
- }
- System.out.println("Summary: totalMemory "+totalMemory+" totalCores "+totalCores);
- List<QueueInfo> qInfo = yarnClient.getAllQueues();
- for(QueueInfo q : qInfo) {
- System.out.println("Queue: "+q.getQueueName()+", Current Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" Applications: "+q.getApplications().size());
- }
- yarnClient.stop();
- System.exit(0);
- }
-
- private File generateDefaultConf(Path localJarPath) throws IOException,
- FileNotFoundException {
- JarFile jar = null;
- try {
- jar = new JarFile(localJarPath.toUri().getPath());
- } catch(FileNotFoundException fne) {
- LOG.fatal("Unable to access jar file. Specify jar file or configuration file.", fne);
- System.exit(1);
- }
- InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
-
- if(confStream == null) {
- LOG.warn("Given jar file does not contain yaml conf.");
- confStream = this.getClass().getResourceAsStream("flink-conf.yaml");
- if(confStream == null) {
- throw new RuntimeException("Unable to find flink-conf in jar file");
- }
- }
- File outFile = new File("flink-conf.yaml");
- if(outFile.exists()) {
- throw new RuntimeException("File unexpectedly exists");
- }
- FileOutputStream outputStream = new FileOutputStream(outFile);
- int read = 0;
- byte[] bytes = new byte[1024];
- while ((read = confStream.read(bytes)) != -1) {
- outputStream.write(bytes, 0, read);
- }
- confStream.close(); outputStream.close(); jar.close();
- return outFile;
- }
-
- public static void main(String[] args) throws Exception {
- Client c = new Client();
- c.run(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java
deleted file mode 100644
index b72b9bc..0000000
--- a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.yarn;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringInterner;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-public class Utils {
-
- private static final Log LOG = LogFactory.getLog(Utils.class);
- private static final int HEAP_LIMIT_CAP = 500;
-
-
- public static void copyJarContents(String prefix, String pathToJar) throws IOException {
- LOG.info("Copying jar (location: "+pathToJar+") to prefix "+prefix);
-
- JarFile jar = null;
- jar = new JarFile(pathToJar);
- Enumeration<JarEntry> enumr = jar.entries();
- byte[] bytes = new byte[1024];
- while(enumr.hasMoreElements()) {
- JarEntry entry = enumr.nextElement();
- if(entry.getName().startsWith(prefix)) {
- if(entry.isDirectory()) {
- File cr = new File(entry.getName());
- cr.mkdirs();
- continue;
- }
- InputStream inStream = jar.getInputStream(entry);
- File outFile = new File(entry.getName());
- if(outFile.exists()) {
- throw new RuntimeException("File unexpectedly exists");
- }
- FileOutputStream outputStream = new FileOutputStream(outFile);
- int read = 0;
- while ((read = inStream.read(bytes)) != -1) {
- outputStream.write(bytes, 0, read);
- }
- inStream.close(); outputStream.close();
- }
- }
- jar.close();
- }
-
- /**
- * Calculate the heap size for the JVMs to start in the containers.
- * Since JVMs are allocating more than just the heap space, and YARN is very
- * fast at killing processes that use memory beyond their limit, we have to come
- * up with a good heapsize.
- * This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85%
- * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB.
- *
- */
- public static int calculateHeapSize(int memory) {
- int heapLimit = (int)((float)memory*0.85);
- if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
- heapLimit = memory-HEAP_LIMIT_CAP;
- }
- return heapLimit;
- }
-
- public static void getFlinkConfiguration(String confDir) {
- GlobalConfiguration.loadConfiguration(confDir);
- }
-
- private static void addPathToConfig(Configuration conf, File path) {
- // chain-in a new classloader
- URL fileUrl = null;
- try {
- fileUrl = path.toURL();
- } catch (MalformedURLException e) {
- throw new RuntimeException("Erroneous config file path", e);
- }
- URL[] urls = {fileUrl};
- ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader());
- conf.setClassLoader(cl);
- }
-
- private static void setDefaultConfValues(Configuration conf) {
- if(conf.get("fs.hdfs.impl",null) == null) {
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- }
- if(conf.get("fs.file.impl",null) == null) {
- conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
- }
- }
-
- public static Configuration initializeYarnConfiguration() {
- Configuration conf = new YarnConfiguration();
- String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
- if(configuredHadoopConfig != null) {
- LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting.");
- addPathToConfig(conf, new File(configuredHadoopConfig));
- setDefaultConfValues(conf);
- return conf;
- }
- String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" };
- for(int i = 0; i < envs.length; ++i) {
- String confPath = System.getenv(envs[i]);
- if (confPath != null) {
- LOG.info("Found "+envs[i]+", adding it to configuration");
- addPathToConfig(conf, new File(confPath));
- setDefaultConfValues(conf);
- return conf;
- }
- }
- LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME.");
- String hadoopHome = null;
- try {
- hadoopHome = Shell.getHadoopHome();
- } catch (IOException e) {
- LOG.fatal("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
- System.exit(1);
- }
- File tryConf = new File(hadoopHome+"/etc/hadoop");
- if(tryConf.exists()) {
- LOG.info("Found configuration using hadoop home.");
- addPathToConfig(conf, tryConf);
- } else {
- tryConf = new File(hadoopHome+"/conf");
- if(tryConf.exists()) {
- addPathToConfig(conf, tryConf);
- }
- }
- setDefaultConfValues(conf);
- return conf;
- }
-
- public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
- for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
- addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
- }
- addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
- }
-
-
- /**
- *
- * @return Path to remote file (usually hdfs)
- * @throws IOException
- */
- public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
- throws IOException {
- // copy to HDFS
- String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
-
- Path dst = new Path(homedir, suffix);
-
- LOG.info("Copying from "+localRsrcPath+" to "+dst );
- fs.copyFromLocalFile(localRsrcPath, dst);
- registerLocalResource(fs, dst, appMasterJar);
- return dst;
- }
-
- public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
- FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
- localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
- localResource.setSize(jarStat.getLen());
- localResource.setTimestamp(jarStat.getModificationTime());
- localResource.setType(LocalResourceType.FILE);
- localResource.setVisibility(LocalResourceVisibility.PUBLIC);
- }
-
- public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
- Credentials credentials = new Credentials();
- // for HDFS
- TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
- // for user
- UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
-
- Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
- for(Token<? extends TokenIdentifier> token : usrTok) {
- final Text id = new Text(token.getIdentifier());
- LOG.info("Adding user token "+id+" with "+token);
- credentials.addToken(id, token);
- }
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- LOG.debug("Wrote tokens. Credentials buffer length: "+dob.getLength());
-
- ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- amContainer.setTokens(securityTokens);
- }
-
- public static void logFilesInCurrentDirectory(final Log logger) {
- new File(".").list(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- logger.info(dir.getAbsolutePath()+"/"+name);
- return true;
- }
- });
- }
-
- /**
- * Copied method from org.apache.hadoop.yarn.util.Apps
- * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
- * by https://issues.apache.org/jira/browse/YARN-1931
- */
- public static void addToEnvironment(Map<String, String> environment,
- String variable, String value) {
- String val = environment.get(variable);
- if (val == null) {
- val = value;
- } else {
- val = val + File.pathSeparator + value;
- }
- environment.put(StringInterner.weakIntern(variable),
- StringInterner.weakIntern(val));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
deleted file mode 100644
index b541317..0000000
--- a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
-public class YarnTaskManagerRunner {
-
- private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
-
- public static void main(final String[] args) throws IOException {
- Map<String, String> envs = System.getenv();
- final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
- final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-
- // configure local directory
- final String[] newArgs = Arrays.copyOf(args, args.length + 2);
- newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
- newArgs[newArgs.length-1] = localDirs;
- LOG.info("Setting log path "+localDirs);
- LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
- + " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
- for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
- ugi.addToken(toks);
- }
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- TaskManager.main(newArgs);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 8127d50..efbff93 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -68,13 +68,13 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>spargel</artifactId>
+ <artifactId>flink-spargel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>avro</artifactId>
+ <artifactId>flink-avro</artifactId>
<version>${project.version}</version>
</dependency>
@@ -117,7 +117,7 @@
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>yarn</artifactId>
+ <artifactId>flink-yarn</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
@@ -170,7 +170,7 @@
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>hbase</artifactId>
+ <artifactId>flink-hbase</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/pom.xml b/flink-quickstart/flink-quickstart-java/pom.xml
new file mode 100644
index 0000000..f51bf7f
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-quickstart</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-quickstart-java</artifactId>
+ <packaging>jar</packaging>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
new file mode 100644
index 0000000..0192712
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.quickstart;
+
+/**
+ * This class solely exists to generate
+ * javadocs for the "quickstart-java" project.
+ **/
+public class Dummy {
+ //
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml
new file mode 100644
index 0000000..221c775
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml
@@ -0,0 +1,8 @@
+<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
+ <id>flink-quickstart</id>
+ <sources>
+ <source>src/main/java/Job.java</source>
+ <source>src/main/java/WordCountJob.java</source>
+ </sources>
+</archetype>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..18d1b3b
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,59 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+ <packaging>jar</packaging>
+
+ <name>Your Job's Name</name>
+ <url>http://www.myorganization.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <!-- These two requirements are the minimum to use and develop Flink.
+ You can add others like <artifactId>pact-scala-core</artifactId> for Scala! -->
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <!-- We use the maven-jar-plugin to generate a runnable jar that you can
+ submit to your Flink cluster. -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <program-class>${package}.Job</program-class>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
new file mode 100644
index 0000000..04e452f
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
@@ -0,0 +1,53 @@
+package ${package};
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Skeleton for a Flink Job.
+ *
+ * For a full example of a Flink Job, see the WordCountJob.java file in the
+ * same package/directory or have a look at the website.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ * mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+public class Job {
+
+ public static void main(String[] args) throws Exception {
+ // set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+
+ /**
+ * Here, you can start creating your execution plan for Flink.
+ *
+ * Start with getting some data from the environment, like
+ * env.readTextFile(textPath);
+ *
+ * then, transform the resulting DataSet<String> using operations
+ * like
+ * .filter()
+ * .flatMap()
+ * .join()
+ * .group()
+ * and many more.
+ * Have a look at the programming guide for the Java API:
+ *
+ * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_guide.html
+ *
+ * and the examples
+ *
+ * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_examples.html
+ *
+ */
+
+ // execute program
+ env.execute("Flink Java API Skeleton");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
new file mode 100644
index 0000000..1ecd979
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
@@ -0,0 +1,81 @@
+package ${package};
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ */
+@SuppressWarnings("serial")
+public class WordCountJob {
+
+ //
+ // Program.
+ //
+
+ public static void main(String[] args) throws Exception {
+
+ // set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<String> text = env.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,"
+ );
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new LineSplitter())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .aggregate(Aggregations.SUM, 1);
+
+ // emit result
+ counts.print();
+
+ // execute program
+ env.execute("WordCount Example");
+ }
+
+ //
+ // User Functions
+ //
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class LineSplitter extends FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/pom.xml b/flink-quickstart/flink-quickstart-scala/pom.xml
new file mode 100644
index 0000000..056e095
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-quickstart</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-quickstart-scala</artifactId>
+ <packaging>jar</packaging>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
new file mode 100644
index 0000000..0192712
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.quickstart;
+
+/**
+ * This class solely exists to generate
+ * javadocs for the "quickstart-java" project.
+ **/
+public class Dummy {
+ //
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..87162d7
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<archetype-descriptor xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" name="prj-scala-only"
+ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <fileSets>
+ <fileSet encoding="UTF-8" filtered="true" packaged="true">
+ <directory>src/main/scala</directory>
+ <includes>
+ <include>**/*.scala</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</archetype-descriptor>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
new file mode 100644
index 0000000..2b4bb84
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
@@ -0,0 +1,7 @@
+<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
+ <id>flink-quickstart-scala</id>
+ <sources>
+ <source>src/main/scala/Job.scala</source>
+ </sources>
+</archetype>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..0a2589b
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,140 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+ <packaging>jar</packaging>
+
+ <name>Your Job's Name</name>
+ <url>http://www.myorganization.org</url>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <!-- These two requirements are the minimum to use and develop Flink.
+ You can add others like <artifactId>flink-scala</artifactId> for Scala!
+ -->
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <!-- We use the maven-jar-plugin to generate a runnable jar that you can
+ submit to your Flink cluster.
+ -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <program-class>${package}.Job</program-class>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
new file mode 100644
index 0000000..b179243
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
@@ -0,0 +1,91 @@
+package ${package};
+
+
+import org.apache.flink.api.common.Program
+import org.apache.flink.api.common.ProgramDescription
+import org.apache.flink.client.LocalExecutor
+import org.apache.flink.api.scala.TextFile
+import org.apache.flink.api.scala.ScalaPlan
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.operators._
+import org.apache.flink.client.RemoteExecutor
+
+// You can run this locally using:
+// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobLocal 2 file:///some/path file:///some/other/path"
+object RunJobLocal {
+ def main(args: Array[String]) {
+ val job = new Job
+ if (args.size < 3) {
+ println(job.getDescription)
+ return
+ }
+ val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
+ LocalExecutor.execute(plan)
+ System.exit(0)
+ }
+}
+
+// You can run this on a cluster using:
+// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobRemote 2 file:///some/path file:///some/other/path"
+object RunJobRemote {
+ def main(args: Array[String]) {
+ val job = new Job
+ if (args.size < 3) {
+ println(job.getDescription)
+ return
+ }
+ val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
+ // This will create an executor to run the plan on a cluster. We assume
+ // that the JobManager is running on the local machine on the default
+ // port. Change this according to your configuration.
+ // You will also need to change the name of the jar if you change the
+ // project name and/or version. Before running this you also need
+ // to run "mvn package" to create the jar.
+ val ex = new RemoteExecutor("localhost", 6123, "target/flink-project-0.1-SNAPSHOT.jar")
+ ex.executePlan(plan)
+ }
+}
+
+
+/**
+ * This is a outline for a Flink scala job. It is actually the WordCount
+ * example from the here distribution.
+ *
+ * You can run it out of your IDE using the main() method of RunJob.
+ * This will use the LocalExecutor to start a little Flink instance
+ * out of your IDE.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ * mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+class Job extends Program with ProgramDescription with Serializable {
+ override def getDescription() = {
+ "Parameters: [numSubStasks] [input] [output]"
+ }
+ override def getPlan(args: String*) = {
+ getScalaPlan(args(0).toInt, args(1), args(2))
+ }
+
+ def formatOutput = (word: String, count: Int) => "%s %d".format(word, count)
+
+ def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
+ val input = TextFile(textInput)
+
+ val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { (_, 1) } }
+ val counts = words groupBy { case (word, _) => word } reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
+
+ counts neglects { case (word, _) => word }
+ counts preserves({ case (word, _) => word }, { case (word, _) => word })
+ val output = counts.write(wordsOutput, DelimitedOutputFormat(formatOutput.tupled))
+
+ val plan = new ScalaPlan(Seq(output), "Word Count (immutable)")
+ plan.setDefaultParallelism(numSubTasks)
+ plan
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 9aa75c9..1d08b9d 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -32,8 +32,8 @@
</distributionManagement>
<modules>
- <module>quickstart-java</module>
- <module>quickstart-scala</module>
+ <module>flink-quickstart-java</module>
+ <module>flink-quickstart-scala</module>
</modules>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-SNAPSHOT.sh
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-SNAPSHOT.sh b/flink-quickstart/quickstart-SNAPSHOT.sh
index c3aa914..be0b461 100755
--- a/flink-quickstart/quickstart-SNAPSHOT.sh
+++ b/flink-quickstart/quickstart-SNAPSHOT.sh
@@ -23,12 +23,12 @@ PACKAGE=quickstart
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
- -DarchetypeArtifactId=quickstart-java \
+ -DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=0.6-incubating-SNAPSHOT \
-DgroupId=org.apache.flink \
-DartifactId=$PACKAGE \
-Dversion=0.1 \
- -Dpackage=org.apache.flink \
+ -Dpackage=org.apache.flink \
-DinteractiveMode=false \
-DarchetypeCatalog=https://oss.sonatype.org/content/repositories/snapshots/
@@ -51,6 +51,6 @@ echo -e "\\n\\n"
#
#mvn archetype:generate \
# -DarchetypeGroupId=org.apache.flink \
-# -DarchetypeArtifactId=quickstart-java \
+# -DarchetypeArtifactId=flink-quickstart-java \
# -DarchetypeVersion=0.6-SNAPSHOT \
# -DarchetypeCatalog=https://oss.sonatype.org/content/repositories/snapshots/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/pom.xml b/flink-quickstart/quickstart-java/pom.xml
deleted file mode 100644
index c092657..0000000
--- a/flink-quickstart/quickstart-java/pom.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-quickstart</artifactId>
- <version>0.6-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>quickstart-java</artifactId>
- <packaging>jar</packaging>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
deleted file mode 100644
index 0192712..0000000
--- a/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.quickstart;
-
-/**
- * This class solely exists to generate
- * javadocs for the "quickstart-java" project.
- **/
-public class Dummy {
- //
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml
deleted file mode 100644
index 221c775..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
- <id>flink-quickstart</id>
- <sources>
- <source>src/main/java/Job.java</source>
- <source>src/main/java/WordCountJob.java</source>
- </sources>
-</archetype>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
deleted file mode 100644
index 18d1b3b..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>${groupId}</groupId>
- <artifactId>${artifactId}</artifactId>
- <version>${version}</version>
- <packaging>jar</packaging>
-
- <name>Your Job's Name</name>
- <url>http://www.myorganization.org</url>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <!-- These two requirements are the minimum to use and develop Flink.
- You can add others like <artifactId>pact-scala-core</artifactId> for Scala! -->
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>0.6-incubating-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>0.6-incubating-SNAPSHOT</version>
- </dependency>
- </dependencies>
-
- <!-- We use the maven-jar-plugin to generate a runnable jar that you can
- submit to your Flink cluster. -->
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <archive>
- <manifestEntries>
- <program-class>${package}.Job</program-class>
- </manifestEntries>
- </archive>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
deleted file mode 100644
index 5a9b46f..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package ${package};
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Skeleton for a Flink Job.
- *
- * For a full example of a Flink Job, see the WordCountJob.java file in the
- * same package/directory or have a look at the website.
- *
- * You can also generate a .jar file that you can submit on your Flink
- * cluster.
- * Just type
- * mvn clean package
- * in the projects root directory.
- * You will find the jar in
- * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
- *
- */
-public class Job {
-
- public static void main(String[] args) throws Exception {
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-
- /**
- * Here, you can start creating your execution plan for Flink.
- *
- * Start with getting some data from the environment, like
- * env.readTextFile(textPath);
- *
- * then, transform the resulting DataSet<String> using operations
- * like
- * .filter()
- * .flatMap()
- * .join()
- * .group()
- * and many more.
- * Have a look at the programming guide for the Java API:
- *
- * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_guide.html
- *
- * and the examples
- *
- * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_examples.html
- *
- */
-
- // execute program
- env.execute("Stratosphere Java API Skeleton");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java b/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
deleted file mode 100644
index 1ecd979..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package ${package};
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram
- * over some sample data
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>write a simple Flink program.
- * <li>use Tuple data types.
- * <li>write and use user-defined functions.
- * </ul>
- *
- */
-@SuppressWarnings("serial")
-public class WordCountJob {
-
- //
- // Program.
- //
-
- public static void main(String[] args) throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataSet<String> text = env.fromElements(
- "To be, or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,"
- );
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new LineSplitter())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
- .aggregate(Aggregations.SUM, 1);
-
- // emit result
- counts.print();
-
- // execute program
- env.execute("WordCount Example");
- }
-
- //
- // User Functions
- //
-
- /**
- * Implements the string tokenizer that splits sentences into words as a user-defined
- * FlatMapFunction. The function takes a line (String) and splits it into
- * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
- */
- public static final class LineSplitter extends FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala-SNAPSHOT.sh
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala-SNAPSHOT.sh b/flink-quickstart/quickstart-scala-SNAPSHOT.sh
index 247e8b4..e8e1c58 100755
--- a/flink-quickstart/quickstart-scala-SNAPSHOT.sh
+++ b/flink-quickstart/quickstart-scala-SNAPSHOT.sh
@@ -23,7 +23,7 @@ PACKAGE=quickstart
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
- -DarchetypeArtifactId=quickstart-scala \
+ -DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=0.6-incubating-SNAPSHOT \
-DgroupId=org.apache.flink \
-DartifactId=$PACKAGE \
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/pom.xml b/flink-quickstart/quickstart-scala/pom.xml
deleted file mode 100644
index 030bd17..0000000
--- a/flink-quickstart/quickstart-scala/pom.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-quickstart</artifactId>
- <version>0.6-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>quickstart-scala</artifactId>
- <packaging>jar</packaging>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
deleted file mode 100644
index 0192712..0000000
--- a/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.quickstart;
-
-/**
- * This class solely exists to generate
- * javadocs for the "quickstart-java" project.
- **/
-public class Dummy {
- //
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index 87162d7..0000000
--- a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,26 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<archetype-descriptor xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" name="prj-scala-only"
- xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <fileSets>
- <fileSet encoding="UTF-8" filtered="true" packaged="true">
- <directory>src/main/scala</directory>
- <includes>
- <include>**/*.scala</include>
- </includes>
- </fileSet>
- </fileSets>
-</archetype-descriptor>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
deleted file mode 100644
index 2b4bb84..0000000
--- a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
- <id>flink-quickstart-scala</id>
- <sources>
- <source>src/main/scala/Job.scala</source>
- </sources>
-</archetype>
\ No newline at end of file