You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:37 UTC

[58/73] [abbrv] prefix all projects in addons and quickstarts with flink-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java
deleted file mode 100644
index 6d4c7b5..0000000
--- a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.jar.JarFile;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-
-/**
- * All classes in this package contain code taken from
- * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
- * and
- * https://github.com/hortonworks/simple-yarn-app
- * and 
- * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- * 
- * The Flink jar is uploaded to HDFS by this client. 
- * The application master and all the TaskManager containers get the jar file downloaded
- * by YARN into their local fs.
- * 
- */
-public class Client {
-	private static final Log LOG = LogFactory.getLog(Client.class);
-	
-	/**
-	 * Command Line argument options
-	 */
-	private static final Option QUERY = new Option("q","query",false, "Display available YARN resources (memory, cores)");
-	// --- or ---
-	private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
-	private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
-	private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
-	private static final Option SHIP_PATH = new Option("s","ship",true, "Ship files in the specified directory");
-	private static final Option FLINK_CONF_DIR = new Option("c","confDir",true, "Path to Flink configuration directory");
-	private static final Option FLINK_JAR = new Option("j","jar",true, "Path to Flink jar file");
-	private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
-	private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
-	private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
-	private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
-			+ " TaskTrackers)");
-	
-	/**
-	 * Constants
-	 */
-	// environment variable names 
-	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-	public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
-	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-	public final static String ENV_APP_ID = "_APP_ID";
-	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
-	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
-	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-	
-	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-
-	
-	
-	private Configuration conf;
-
-	public void run(String[] args) throws Exception {
-		
-		if(UserGroupInformation.isSecurityEnabled()) {
-			throw new RuntimeException("Flink YARN client does not have security support right now."
-					+ "File a bug, we will fix it asap");
-		}
-		//Utils.logFilesInCurrentDirectory(LOG);
-		//
-		//	Command Line Options
-		//
-		Options options = new Options();
-		options.addOption(VERBOSE);
-		options.addOption(FLINK_CONF_DIR);
-		options.addOption(FLINK_JAR);
-		options.addOption(JM_MEMORY);
-		options.addOption(TM_MEMORY);
-		options.addOption(TM_CORES);
-		options.addOption(CONTAINER);
-		options.addOption(GEN_CONF);
-		options.addOption(QUEUE);
-		options.addOption(QUERY);
-		options.addOption(SHIP_PATH);
-		
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd = null;
-		try {
-			cmd = parser.parse( options, args);
-		} catch(MissingOptionException moe) {
-			System.out.println(moe.getMessage());
-			printUsage();
-			System.exit(1);
-		}
-		
-		if (System.getProperty("log4j.configuration") == null) {
-			Logger root = Logger.getRootLogger();
-			root.removeAllAppenders();
-			PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
-			ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
-			root.addAppender(appender);
-			if(cmd.hasOption(VERBOSE.getOpt())) {
-				root.setLevel(Level.DEBUG);
-				LOG.debug("CLASSPATH: "+System.getProperty("java.class.path"));
-			} else {
-				root.setLevel(Level.INFO);
-			}
-		}
-		
-		
-		// Jar Path
-		Path localJarPath;
-		if(cmd.hasOption(FLINK_JAR.getOpt())) {
-			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
-			if(!userPath.startsWith("file://")) {
-				userPath = "file://" + userPath;
-			}
-			localJarPath = new Path(userPath);
-		} else {
-			localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
-		}
-		
-		if(cmd.hasOption(GEN_CONF.getOpt())) {
-			LOG.info("Placing default configuration in current directory");
-			File outFile = generateDefaultConf(localJarPath);
-			LOG.info("File written to "+outFile.getAbsolutePath());
-			System.exit(0);
-		}
-		
-		// Conf Path 
-		Path confPath = null;
-		String confDirPath = "";
-		if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
-			confDirPath = cmd.getOptionValue(FLINK_CONF_DIR.getOpt())+"/";
-			File confFile = new File(confDirPath+CONFIG_FILE_NAME);
-			if(!confFile.exists()) {
-				LOG.fatal("Unable to locate configuration file in "+confFile);
-				System.exit(1);
-			}
-			confPath = new Path(confFile.getAbsolutePath());
-		} else {
-			System.out.println("No configuration file has been specified");
-			
-			// no configuration path given.
-			// -> see if there is one in the current directory
-			File currDir = new File(".");
-			File[] candidates = currDir.listFiles(new FilenameFilter() {
-				@Override
-				public boolean accept(final File dir, final String name) {
-					return name != null && name.endsWith(".yaml");
-				}
-			});
-			if(candidates == null || candidates.length == 0) {
-				System.out.println("No configuration file has been found in current directory.\n"
-						+ "Copying default.");
-				File outFile = generateDefaultConf(localJarPath);
-				confPath = new Path(outFile.toURI());
-			} else {
-				if(candidates.length > 1) {
-					System.out.println("Multiple .yaml configuration files were found in the current directory\n"
-							+ "Please specify one explicitly");
-					System.exit(1);
-				} else if(candidates.length == 1) {
-					confPath = new Path(candidates[0].toURI());
-				} 
-			}
-		}
-		List<File> shipFiles = new ArrayList<File>();
-		// path to directory to ship
-		if(cmd.hasOption(SHIP_PATH.getOpt())) {
-			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
-			File shipDir = new File(shipPath);
-			if(shipDir.isDirectory()) {
-				shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
-					@Override
-					public boolean accept(File dir, String name) {
-						return !(name.equals(".") || name.equals("..") );
-					}
-				})));
-			} else {
-				LOG.warn("Ship directory is not a directory!");
-			}
-		}
-		boolean hasLog4j = false;
-		//check if there is a log4j file
-		if(confDirPath.length() > 0) {
-			File l4j = new File(confDirPath+"/log4j.properties");
-			if(l4j.exists()) {
-				shipFiles.add(l4j);
-				hasLog4j = true;
-			}
-		}
-		
-		// queue
-		String queue = "default";
-		if(cmd.hasOption(QUEUE.getOpt())) {
-			queue = cmd.getOptionValue(QUEUE.getOpt());
-		}
-		
-		// JobManager Memory
-		int jmMemory = 512;
-		if(cmd.hasOption(JM_MEMORY.getOpt())) {
-			jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
-		}
-		
-		// Task Managers memory
-		int tmMemory = 1024;
-		if(cmd.hasOption(TM_MEMORY.getOpt())) {
-			tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
-		}
-		
-		// Task Managers vcores
-		int tmCores = 1;
-		if(cmd.hasOption(TM_CORES.getOpt())) {
-			tmCores = Integer.valueOf(cmd.getOptionValue(TM_CORES.getOpt()));
-		}
-		Utils.getFlinkConfiguration(confPath.toUri().getPath());
-		int jmPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0);
-		if(jmPort == 0) {
-			LOG.warn("Unable to find job manager port in configuration!");
-			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
-		}
-		conf = Utils.initializeYarnConfiguration();
-		
-		// intialize HDFS
-		LOG.info("Copy App Master jar from local filesystem and add to local environment");
-		// Copy the application master jar to the filesystem 
-		// Create a local resource to point to the destination jar path 
-		final FileSystem fs = FileSystem.get(conf);
-		
-		if(fs.getScheme().startsWith("file")) {
-			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
-					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
-					+ "The Flink YARN client needs to store its files in a distributed file system");
-		}
-		
-		// Create yarnClient
-		final YarnClient yarnClient = YarnClient.createYarnClient();
-		yarnClient.init(conf);
-		yarnClient.start();
-		
-		// Query cluster for metrics
-		if(cmd.hasOption(QUERY.getOpt())) {
-			showClusterMetrics(yarnClient);
-		}
-		if(!cmd.hasOption(CONTAINER.getOpt())) {
-			LOG.fatal("Missing required argument "+CONTAINER.getOpt());
-			printUsage();
-			yarnClient.stop();
-			System.exit(1);
-		}
-		
-		// TM Count
-		final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
-		
-		System.out.println("Using values:");
-		System.out.println("\tContainer Count = "+taskManagerCount);
-		System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
-		System.out.println("\tConfiguration file = "+confPath.toUri().getPath());
-		System.out.println("\tJobManager memory = "+jmMemory);
-		System.out.println("\tTaskManager memory = "+tmMemory);
-		System.out.println("\tTaskManager cores = "+tmCores);
-
-		// Create application via yarnClient
-		YarnClientApplication app = yarnClient.createApplication();
-		GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
-		Resource maxRes = appResponse.getMaximumResourceCapability();
-		if(tmMemory > maxRes.getMemory() || tmCores > maxRes.getVirtualCores()) {
-			LOG.fatal("The cluster does not have the requested resources for the TaskManagers available!\n"
-					+ "Maximum Memory: "+maxRes.getMemory() +", Maximum Cores: "+tmCores);
-			yarnClient.stop();
-			System.exit(1);
-		}
-		if(jmMemory > maxRes.getMemory() ) {
-			LOG.fatal("The cluster does not have the requested resources for the JobManager available!\n"
-					+ "Maximum Memory: "+maxRes.getMemory());
-			yarnClient.stop();
-			System.exit(1);
-		}
-		int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
-		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
-		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
-			LOG.fatal("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
-					+ "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
-			yarnClient.stop();
-			System.exit(1);
-		}
-		if( tmMemory > freeClusterMem.containerLimit) {
-			LOG.fatal("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
-					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
-			yarnClient.stop();
-			System.exit(1);
-		}
-		if( jmMemory > freeClusterMem.containerLimit) {
-			LOG.fatal("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
-					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
-			yarnClient.stop();
-			System.exit(1);
-		}
-		
-		// respect custom JVM options in the YAML file
-		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-		
-		// Set up the container launch context for the application master
-		ContainerLaunchContext amContainer = Records
-				.newRecord(ContainerLaunchContext.class);
-		
-		String amCommand = "$JAVA_HOME/bin/java"
-					+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
-		if(hasLog4j) {
-			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
-		}
-		amCommand 	+= " org.apache.flink.yarn.ApplicationMaster" + " "
-					+ " 1>"
-					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
-					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
-		amContainer.setCommands(Collections.singletonList(amCommand));
-		
-		System.err.println("amCommand="+amCommand);
-		
-		// Set-up ApplicationSubmissionContext for the application
-		ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
-		final ApplicationId appId = appContext.getApplicationId();
-		
-		// Setup jar for ApplicationMaster
-		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-		Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar, fs.getHomeDirectory());
-		Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), confPath, flinkConf, fs.getHomeDirectory());
-		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
-		localResources.put("flink.jar", appMasterJar);
-		localResources.put("flink-conf.yaml", flinkConf);
-		
-		
-		// setup security tokens (code from apache storm)
-		final Path[] paths = new Path[3 + shipFiles.size()];
-		StringBuffer envShipFileList = new StringBuffer();
-		// upload ship files
-		for (int i = 0; i < shipFiles.size(); i++) {
-			File shipFile = shipFiles.get(i);
-			LocalResource shipResources = Records.newRecord(LocalResource.class);
-			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
-			paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
-					shipLocalPath, shipResources, fs.getHomeDirectory());
-			localResources.put(shipFile.getName(), shipResources);
-			
-			envShipFileList.append(paths[3 + i]);
-			if(i+1 < shipFiles.size()) {
-				envShipFileList.append(',');
-			}
-		}
-
-		paths[0] = remotePathJar;
-		paths[1] = remotePathConf;
-		paths[2] = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
-		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-		fs.setPermission(paths[2], permission); // set permission for path.
-		Utils.setTokensFor(amContainer, paths, this.conf);
-		
-		 
-		amContainer.setLocalResources(localResources);
-		fs.close();
-
-		// Setup CLASSPATH for ApplicationMaster
-		Map<String, String> appMasterEnv = new HashMap<String, String>();
-		Utils.setupEnv(conf, appMasterEnv);
-		// set configuration values
-		appMasterEnv.put(Client.ENV_TM_COUNT, String.valueOf(taskManagerCount));
-		appMasterEnv.put(Client.ENV_TM_CORES, String.valueOf(tmCores));
-		appMasterEnv.put(Client.ENV_TM_MEMORY, String.valueOf(tmMemory));
-		appMasterEnv.put(Client.FLINK_JAR_PATH, remotePathJar.toString() );
-		appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
-		appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
-		appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
-		appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-		
-		amContainer.setEnvironment(appMasterEnv);
-		
-		// Set up resource type requirements for ApplicationMaster
-		Resource capability = Records.newRecord(Resource.class);
-		capability.setMemory(jmMemory);
-		capability.setVirtualCores(1);
-		
-		appContext.setApplicationName("Flink"); // application name
-		appContext.setAMContainerSpec(amContainer);
-		appContext.setResource(capability);
-		appContext.setQueue(queue);
-		
-		// file that we write into the conf/ dir containing the jobManager address.
-		final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
-		
-		Runtime.getRuntime().addShutdownHook(new Thread() {
-		@Override
-		public void run() {
-			try {
-				LOG.info("Killing the Flink-YARN application.");
-				yarnClient.killApplication(appId);
-				LOG.info("Deleting files in "+paths[2]);
-				FileSystem shutFS = FileSystem.get(conf);
-				shutFS.delete(paths[2], true); // delete conf and jar file.
-				shutFS.close();
-			} catch (Exception e) {
-				LOG.warn("Exception while killing the YARN application", e);
-			}
-			try {
-				addrFile.delete();
-			} catch (Exception e) {
-				LOG.warn("Exception while deleting the jobmanager address file", e);
-			}
-			LOG.info("YARN Client is shutting down");
-			yarnClient.stop();
-		}
-		});
-		
-		LOG.info("Submitting application master " + appId);
-		yarnClient.submitApplication(appContext);
-		ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-		YarnApplicationState appState = appReport.getYarnApplicationState();
-		boolean told = false;
-		char[] el = { '/', '|', '\\', '-'};
-		int i = 0; 
-		while (appState != YarnApplicationState.FINISHED
-				&& appState != YarnApplicationState.KILLED
-				&& appState != YarnApplicationState.FAILED) {
-			if(!told && appState ==  YarnApplicationState.RUNNING) {
-				System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
-				System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
-				// write jobmanager connect information
-				
-				PrintWriter out = new PrintWriter(addrFile);
-				out.println(appReport.getHost()+":"+jmPort);
-				out.close();
-				addrFile.setReadable(true, false); // readable for all.
-				told = true;
-			}
-			if(!told) {
-				System.err.print(el[i++]+"\r");
-				if(i == el.length) {
-					i = 0;
-				}
-				Thread.sleep(500); // wait for the application to switch to RUNNING
-			} else {
-				Thread.sleep(5000);
-			}
-			
-			appReport = yarnClient.getApplicationReport(appId);
-			appState = appReport.getYarnApplicationState();
-		}
-
-		LOG.info("Application " + appId + " finished with"
-				+ " state " + appState + " at " + appReport.getFinishTime());
-		if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
-			LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
-		}
-		
-	}
-	private static class ClusterResourceDescription {
-		public int totalFreeMemory;
-		public int containerLimit;
-	}
-	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
-		ClusterResourceDescription crd = new ClusterResourceDescription();
-		crd.totalFreeMemory = 0;
-		crd.containerLimit = 0;
-		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-		for(NodeReport rep : nodes) {
-			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
-			crd.totalFreeMemory += free;
-			if(free > crd.containerLimit) {
-				crd.containerLimit = free;
-			}
-		}
-		return crd;
-	}
-
-	private void printUsage() {
-		System.out.println("Usage:");
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setWidth(200);
-		formatter.setLeftPadding(5);
-		formatter.setSyntaxPrefix("   Required");
-		Options req = new Options();
-		req.addOption(CONTAINER);
-		formatter.printHelp(" ", req);
-		
-		formatter.setSyntaxPrefix("   Optional");
-		Options opt = new Options();
-		opt.addOption(VERBOSE);
-	//	opt.addOption(GEN_CONF);
-	//	opt.addOption(STRATOSPHERE_CONF);
-	//	opt.addOption(STRATOSPHERE_JAR);
-		opt.addOption(JM_MEMORY);
-		opt.addOption(TM_MEMORY);
-		opt.addOption(TM_CORES);
-		opt.addOption(QUERY);
-		opt.addOption(QUEUE);
-		formatter.printHelp(" ", opt);
-	}
-
-	private void showClusterMetrics(YarnClient yarnClient)
-			throws YarnException, IOException {
-		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
-		System.out.println("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
-		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-		final String format = "|%-16s |%-16s %n";
-		System.out.printf("|Property         |Value          %n");
-		System.out.println("+---------------------------------------+");
-		int totalMemory = 0;
-		int totalCores = 0;
-		for(NodeReport rep : nodes) {
-			final Resource res = rep.getCapability();
-			totalMemory += res.getMemory();
-			totalCores += res.getVirtualCores();
-			System.out.format(format, "NodeID", rep.getNodeId());
-			System.out.format(format, "Memory", res.getMemory()+" MB");
-			System.out.format(format, "vCores", res.getVirtualCores());
-			System.out.format(format, "HealthReport", rep.getHealthReport());
-			System.out.format(format, "Containers", rep.getNumContainers());
-			System.out.println("+---------------------------------------+");
-		}
-		System.out.println("Summary: totalMemory "+totalMemory+" totalCores "+totalCores);
-		List<QueueInfo> qInfo = yarnClient.getAllQueues();
-		for(QueueInfo q : qInfo) {
-			System.out.println("Queue: "+q.getQueueName()+", Current Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" Applications: "+q.getApplications().size());
-		}
-		yarnClient.stop();
-		System.exit(0);
-	}
-
-	private File generateDefaultConf(Path localJarPath) throws IOException,
-			FileNotFoundException {
-		JarFile jar = null;
-		try {
-			jar = new JarFile(localJarPath.toUri().getPath());
-		} catch(FileNotFoundException fne) {
-			LOG.fatal("Unable to access jar file. Specify jar file or configuration file.", fne);
-			System.exit(1);
-		}
-		InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
-		
-		if(confStream == null) {
-			LOG.warn("Given jar file does not contain yaml conf.");
-			confStream = this.getClass().getResourceAsStream("flink-conf.yaml"); 
-			if(confStream == null) {
-				throw new RuntimeException("Unable to find flink-conf in jar file");
-			}
-		}
-		File outFile = new File("flink-conf.yaml");
-		if(outFile.exists()) {
-			throw new RuntimeException("File unexpectedly exists");
-		}
-		FileOutputStream outputStream = new FileOutputStream(outFile);
-		int read = 0;
-		byte[] bytes = new byte[1024];
-		while ((read = confStream.read(bytes)) != -1) {
-			outputStream.write(bytes, 0, read);
-		}
-		confStream.close(); outputStream.close(); jar.close();
-		return outFile;
-	}
-
-	public static void main(String[] args) throws Exception {
-		Client c = new Client();
-		c.run(args);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java
deleted file mode 100644
index b72b9bc..0000000
--- a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.yarn;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringInterner;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-public class Utils {
-	
-	private static final Log LOG = LogFactory.getLog(Utils.class);
-	private static final int HEAP_LIMIT_CAP = 500;
-	
-
-	public static void copyJarContents(String prefix, String pathToJar) throws IOException {
-		LOG.info("Copying jar (location: "+pathToJar+") to prefix "+prefix);
-		
-		JarFile jar = null;
-		jar = new JarFile(pathToJar);
-		Enumeration<JarEntry> enumr = jar.entries();
-		byte[] bytes = new byte[1024];
-		while(enumr.hasMoreElements()) {
-			JarEntry entry = enumr.nextElement();
-			if(entry.getName().startsWith(prefix)) {
-				if(entry.isDirectory()) {
-					File cr = new File(entry.getName());
-					cr.mkdirs();
-					continue;
-				}
-				InputStream inStream = jar.getInputStream(entry);
-				File outFile = new File(entry.getName());
-				if(outFile.exists()) {
-					throw new RuntimeException("File unexpectedly exists");
-				}
-				FileOutputStream outputStream = new FileOutputStream(outFile);
-				int read = 0;
-				while ((read = inStream.read(bytes)) != -1) {
-					outputStream.write(bytes, 0, read);
-				}
-				inStream.close(); outputStream.close(); 
-			}
-		}
-		jar.close();
-	}
-	
-	/**
-	 * Calculate the heap size for the JVMs to start in the containers.
-	 * Since JVMs are allocating more than just the heap space, and YARN is very
-	 * fast at killing processes that use memory beyond their limit, we have to come
-	 * up with a good heapsize.
-	 * This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85%
-	 * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB.
-	 * 
-	 */
-	public static int calculateHeapSize(int memory) {
-		int heapLimit = (int)((float)memory*0.85);
-		if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
-			heapLimit = memory-HEAP_LIMIT_CAP;
-		}
-		return heapLimit;
-	}
-	
-	public static void getFlinkConfiguration(String confDir) {
-		GlobalConfiguration.loadConfiguration(confDir);
-	}
-	
-	private static void addPathToConfig(Configuration conf, File path) {
-		// chain-in a new classloader
-		URL fileUrl = null;
-		try {
-			fileUrl = path.toURL();
-		} catch (MalformedURLException e) {
-			throw new RuntimeException("Erroneous config file path", e);
-		}
-		URL[] urls = {fileUrl};
-		ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader());
-		conf.setClassLoader(cl);
-	}
-	
-	private static void setDefaultConfValues(Configuration conf) {
-		if(conf.get("fs.hdfs.impl",null) == null) {
-			conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-		}
-		if(conf.get("fs.file.impl",null) == null) {
-			conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
-		}
-	}
-	
-	public static Configuration initializeYarnConfiguration() {
-		Configuration conf = new YarnConfiguration();
-		String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
-		if(configuredHadoopConfig != null) {
-			LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting.");
-			addPathToConfig(conf, new File(configuredHadoopConfig));
-			setDefaultConfValues(conf);
-			return conf;
-		}
-		String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" };
-		for(int i = 0; i < envs.length; ++i) {
-			String confPath = System.getenv(envs[i]);
-			if (confPath != null) {
-				LOG.info("Found "+envs[i]+", adding it to configuration");
-				addPathToConfig(conf, new File(confPath));
-				setDefaultConfValues(conf);
-				return conf;
-			}
-		}
-		LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME.");
-		String hadoopHome = null;
-		try {
-			hadoopHome = Shell.getHadoopHome();
-		} catch (IOException e) {
-			LOG.fatal("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
-			System.exit(1);
-		}
-		File tryConf = new File(hadoopHome+"/etc/hadoop");
-		if(tryConf.exists()) {
-			LOG.info("Found configuration using hadoop home.");
-			addPathToConfig(conf, tryConf);
-		} else {
-			tryConf = new File(hadoopHome+"/conf");
-			if(tryConf.exists()) {
-				addPathToConfig(conf, tryConf);
-			}
-		}
-		setDefaultConfValues(conf);
-		return conf;
-	}
-	
-	public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
-		for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-			addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
-		}
-		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
-	}
-	
-	
-	/**
-	 * 
-	 * @return Path to remote file (usually hdfs)
-	 * @throws IOException
-	 */
-	public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
-			throws IOException {
-		// copy to HDFS
-		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
-		
-		Path dst = new Path(homedir, suffix);
-		
-		LOG.info("Copying from "+localRsrcPath+" to "+dst );
-		fs.copyFromLocalFile(localRsrcPath, dst);
-		registerLocalResource(fs, dst, appMasterJar);
-		return dst;
-	}
-	
-	public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
-		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
-		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
-		localResource.setSize(jarStat.getLen());
-		localResource.setTimestamp(jarStat.getModificationTime());
-		localResource.setType(LocalResourceType.FILE);
-		localResource.setVisibility(LocalResourceVisibility.PUBLIC);
-	}
-
-	public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
-		Credentials credentials = new Credentials();
-		// for HDFS
-		TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
-		// for user
-		UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
-		
-		Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
-		for(Token<? extends TokenIdentifier> token : usrTok) {
-			final Text id = new Text(token.getIdentifier());
-			LOG.info("Adding user token "+id+" with "+token);
-			credentials.addToken(id, token);
-		}
-		DataOutputBuffer dob = new DataOutputBuffer();
-		credentials.writeTokenStorageToStream(dob);
-		LOG.debug("Wrote tokens. Credentials buffer length: "+dob.getLength());
-		
-		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-		amContainer.setTokens(securityTokens);
-	}
-	
-	public static void logFilesInCurrentDirectory(final Log logger) {
-		new File(".").list(new FilenameFilter() {
-			
-			@Override
-			public boolean accept(File dir, String name) {
-				logger.info(dir.getAbsolutePath()+"/"+name);
-				return true;
-			}
-		});
-	}
-	
-	/**
-	 * Copied method from org.apache.hadoop.yarn.util.Apps
-	 * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
-	 * by https://issues.apache.org/jira/browse/YARN-1931
-	 */
-	public static void addToEnvironment(Map<String, String> environment,
-			String variable, String value) {
-		String val = environment.get(variable);
-		if (val == null) {
-			val = value;
-		} else {
-			val = val + File.pathSeparator + value;
-		}
-		environment.put(StringInterner.weakIntern(variable),
-				StringInterner.weakIntern(val));
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
deleted file mode 100644
index b541317..0000000
--- a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
-public class YarnTaskManagerRunner {
-	
-	private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
-	
-	public static void main(final String[] args) throws IOException {
-		Map<String, String> envs = System.getenv();
-		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-		
-		// configure local directory
-		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
-		newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
-		newArgs[newArgs.length-1] = localDirs;
-		LOG.info("Setting log path "+localDirs);
-		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					TaskManager.main(newArgs);
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
-				return null;
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 8127d50..efbff93 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -68,13 +68,13 @@
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>spargel</artifactId>
+			<artifactId>flink-spargel</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>avro</artifactId>
+			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -117,7 +117,7 @@
 			<dependencies>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>yarn</artifactId>
+					<artifactId>flink-yarn</artifactId>
 					<version>${project.version}</version>
 				</dependency>
 			</dependencies>
@@ -170,7 +170,7 @@
 			<dependencies>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>hbase</artifactId>
+					<artifactId>flink-hbase</artifactId>
 					<version>${project.version}</version>
 				</dependency>
 			</dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/pom.xml b/flink-quickstart/flink-quickstart-java/pom.xml
new file mode 100644
index 0000000..f51bf7f
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <parent>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-quickstart</artifactId>
+    <version>0.6-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-quickstart-java</artifactId>
+  <packaging>jar</packaging>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
new file mode 100644
index 0000000..0192712
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.quickstart;
+
+/**
+ * This class solely exists to generate
+ * javadocs for the "quickstart-java" project.
+ **/
+public class Dummy {
+	//
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml
new file mode 100644
index 0000000..221c775
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/META-INF/maven/archetype.xml
@@ -0,0 +1,8 @@
+<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
+  <id>flink-quickstart</id>
+  <sources>
+    <source>src/main/java/Job.java</source>
+    <source>src/main/java/WordCountJob.java</source>
+  </sources>
+</archetype>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..18d1b3b
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,59 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	
+	<groupId>${groupId}</groupId>
+	<artifactId>${artifactId}</artifactId>
+	<version>${version}</version>
+	<packaging>jar</packaging>
+
+	<name>Your Job's Name</name>
+	<url>http://www.myorganization.org</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<!-- These two requirements are the minimum to use and develop Flink. 
+		You can add others like <artifactId>pact-scala-core</artifactId> for Scala! -->
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>0.6-incubating-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>0.6-incubating-SNAPSHOT</version>
+		</dependency>
+	</dependencies>
+
+	<!-- We use the maven-jar-plugin to generate a runnable jar that you can 
+		submit to your Flink cluster. -->
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<archive>
+						<manifestEntries>
+							<program-class>${package}.Job</program-class>
+						</manifestEntries>
+					</archive>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
new file mode 100644
index 0000000..04e452f
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
@@ -0,0 +1,53 @@
+package ${package};
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Skeleton for a Flink Job.
+ *
+ * For a full example of a Flink Job, see the WordCountJob.java file in the
+ * same package/directory or have a look at the website.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ * 		mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+public class Job {
+
+	public static void main(String[] args) throws Exception {
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+
+		/**
+		 * Here, you can start creating your execution plan for Flink.
+		 *
+		 * Start with getting some data from the environment, like
+		 * 	env.readTextFile(textPath);
+		 *
+		 * then, transform the resulting DataSet<String> using operations
+		 * like
+		 * 	.filter()
+		 * 	.flatMap()
+		 * 	.join()
+		 * 	.group()
+		 * and many more.
+		 * Have a look at the programming guide for the Java API:
+		 *
+		 * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_guide.html
+		 *
+		 * and the examples
+		 *
+		 * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_examples.html
+		 *
+		 */
+
+		// execute program
+		env.execute("Flink Java API Skeleton");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
new file mode 100644
index 0000000..1ecd979
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
@@ -0,0 +1,81 @@
+package ${package};
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ */
+@SuppressWarnings("serial")
+public class WordCountJob {
+
+	//
+	//	Program.
+	//
+
+	public static void main(String[] args) throws Exception {
+
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataSet<String> text = env.fromElements(
+				"To be, or not to be,--that is the question:--",
+				"Whether 'tis nobler in the mind to suffer",
+				"The slings and arrows of outrageous fortune",
+				"Or to take arms against a sea of troubles,"
+				);
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new LineSplitter())
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.aggregate(Aggregations.SUM, 1);
+
+		// emit result
+		counts.print();
+
+		// execute program
+		env.execute("WordCount Example");
+	}
+
+	//
+	// 	User Functions
+	//
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into
+	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class LineSplitter extends FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/pom.xml b/flink-quickstart/flink-quickstart-scala/pom.xml
new file mode 100644
index 0000000..056e095
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <parent>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-quickstart</artifactId>
+    <version>0.6-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-quickstart-scala</artifactId>
+  <packaging>jar</packaging>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
new file mode 100644
index 0000000..0192712
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.quickstart;
+
+/**
+ * This class solely exists to generate
+ * javadocs for the "quickstart-java" project.
+ **/
+public class Dummy {
+	//
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..87162d7
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<archetype-descriptor xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" name="prj-scala-only"
+    xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <fileSets>
+    <fileSet encoding="UTF-8" filtered="true" packaged="true">
+      <directory>src/main/scala</directory>
+      <includes>
+        <include>**/*.scala</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+</archetype-descriptor>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
new file mode 100644
index 0000000..2b4bb84
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
@@ -0,0 +1,7 @@
+<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
+  <id>flink-quickstart-scala</id>
+  <sources>
+    <source>src/main/scala/Job.scala</source>
+  </sources>
+</archetype>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..0a2589b
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,140 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>${groupId}</groupId>
+  <artifactId>${artifactId}</artifactId>
+  <version>${version}</version>
+  <packaging>jar</packaging>
+
+  <name>Your Job's Name</name>
+  <url>http://www.myorganization.org</url>
+
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <!--  These two requirements are the minimum to use and develop Flink.
+        You can add others like <artifactId>flink-scala</artifactId> for Scala!
+  -->
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-scala</artifactId>
+      <version>0.6-incubating-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients</artifactId>
+      <version>0.6-incubating-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <!--  We use the maven-jar-plugin to generate a runnable jar that you can
+        submit to your Flink cluster.
+  -->
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+          <archive>
+            <manifestEntries>
+              <program-class>${package}.Job</program-class>
+            </manifestEntries>
+          </archive>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <version>3.1.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Eclipse Integration -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-eclipse-plugin</artifactId>
+        <version>2.8</version>
+        <configuration>
+          <downloadSources>true</downloadSources>
+          <projectnatures>
+            <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+            <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+          </projectnatures>
+          <buildcommands>
+            <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+          </buildcommands>
+          <classpathContainers>
+            <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+          </classpathContainers>
+          <excludes>
+            <exclude>org.scala-lang:scala-library</exclude>
+            <exclude>org.scala-lang:scala-compiler</exclude>
+          </excludes>
+          <sourceIncludes>
+            <sourceInclude>**/*.scala</sourceInclude>
+            <sourceInclude>**/*.java</sourceInclude>
+          </sourceIncludes>
+        </configuration>
+      </plugin>
+
+      <!-- Adding scala source directories to build path -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.7</version>
+        <executions>
+          <!-- Add src/main/scala to eclipse build path -->
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/main/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+          <!-- Add src/test/scala to eclipse build path -->
+          <execution>
+            <id>add-test-source</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/test/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
new file mode 100644
index 0000000..b179243
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
@@ -0,0 +1,91 @@
+package ${package};
+
+
+import org.apache.flink.api.common.Program
+import org.apache.flink.api.common.ProgramDescription
+import org.apache.flink.client.LocalExecutor
+import org.apache.flink.api.scala.TextFile
+import org.apache.flink.api.scala.ScalaPlan
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.operators._
+import org.apache.flink.client.RemoteExecutor
+
+// You can run this locally using:
+// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobLocal 2 file:///some/path file:///some/other/path"
+object RunJobLocal {
+  def main(args: Array[String]) {
+    val job = new Job
+    if (args.size < 3) {
+      println(job.getDescription)
+      return
+    }
+    val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
+    LocalExecutor.execute(plan)
+    System.exit(0)
+  }
+}
+
+// You can run this on a cluster using:
+// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobRemote 2 file:///some/path file:///some/other/path"
+object RunJobRemote {
+  def main(args: Array[String]) {
+    val job = new Job
+    if (args.size < 3) {
+      println(job.getDescription)
+      return
+    }
+    val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
+    // This will create an executor to run the plan on a cluster. We assume
+    // that the JobManager is running on the local machine on the default
+    // port. Change this according to your configuration.
+    // You will also need to change the name of the jar if you change the
+    // project name and/or version. Before running this you also need
+    // to run "mvn package" to create the jar.
+    val ex = new RemoteExecutor("localhost", 6123, "target/flink-project-0.1-SNAPSHOT.jar")
+    ex.executePlan(plan)
+  }
+}
+
+
+/**
+ * This is a outline for a Flink scala job. It is actually the WordCount
+ * example from the here distribution.
+ *
+ * You can run it out of your IDE using the main() method of RunJob.
+ * This will use the LocalExecutor to start a little Flink instance
+ * out of your IDE.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ *      mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ *      target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+class Job extends Program with ProgramDescription with Serializable {
+  override def getDescription() = {
+    "Parameters: [numSubStasks] [input] [output]"
+  }
+  override def getPlan(args: String*) = {
+    getScalaPlan(args(0).toInt, args(1), args(2))
+  }
+
+  def formatOutput = (word: String, count: Int) => "%s %d".format(word, count)
+
+  def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
+    val input = TextFile(textInput)
+
+    val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { (_, 1) } }
+    val counts = words groupBy { case (word, _) => word } reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
+
+    counts neglects { case (word, _) => word }
+    counts preserves({ case (word, _) => word }, { case (word, _) => word })
+    val output = counts.write(wordsOutput, DelimitedOutputFormat(formatOutput.tupled))
+
+    val plan = new ScalaPlan(Seq(output), "Word Count (immutable)")
+    plan.setDefaultParallelism(numSubTasks)
+    plan
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 9aa75c9..1d08b9d 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -32,8 +32,8 @@
 	</distributionManagement>
 
 		<modules>
-			<module>quickstart-java</module>
-			<module>quickstart-scala</module>
+			<module>flink-quickstart-java</module>
+			<module>flink-quickstart-scala</module>
 		</modules>
 
 		<profiles>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-SNAPSHOT.sh
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-SNAPSHOT.sh b/flink-quickstart/quickstart-SNAPSHOT.sh
index c3aa914..be0b461 100755
--- a/flink-quickstart/quickstart-SNAPSHOT.sh
+++ b/flink-quickstart/quickstart-SNAPSHOT.sh
@@ -23,12 +23,12 @@ PACKAGE=quickstart
 
 mvn archetype:generate								\
   -DarchetypeGroupId=org.apache.flink				\
-  -DarchetypeArtifactId=quickstart-java				\
+  -DarchetypeArtifactId=flink-quickstart-java		\
   -DarchetypeVersion=0.6-incubating-SNAPSHOT		\
   -DgroupId=org.apache.flink 						\
   -DartifactId=$PACKAGE								\
   -Dversion=0.1										\
-  -Dpackage=org.apache.flink 				\
+  -Dpackage=org.apache.flink 						\
   -DinteractiveMode=false							\
   -DarchetypeCatalog=https://oss.sonatype.org/content/repositories/snapshots/
 
@@ -51,6 +51,6 @@ echo -e "\\n\\n"
 #
 #mvn archetype:generate								\
 #  -DarchetypeGroupId=org.apache.flink				\
-#  -DarchetypeArtifactId=quickstart-java				\
+#  -DarchetypeArtifactId=flink-quickstart-java				\
 #  -DarchetypeVersion=0.6-SNAPSHOT					\
 #  -DarchetypeCatalog=https://oss.sonatype.org/content/repositories/snapshots/

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/pom.xml b/flink-quickstart/quickstart-java/pom.xml
deleted file mode 100644
index c092657..0000000
--- a/flink-quickstart/quickstart-java/pom.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-
-  <parent>
-    <groupId>org.apache.flink</groupId>
-    <artifactId>flink-quickstart</artifactId>
-    <version>0.6-incubating-SNAPSHOT</version>
-    <relativePath>..</relativePath>
-  </parent>
-
-  <artifactId>quickstart-java</artifactId>
-  <packaging>jar</packaging>
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
deleted file mode 100644
index 0192712..0000000
--- a/flink-quickstart/quickstart-java/src/main/java/org/apache/flink/quickstart/Dummy.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.quickstart;
-
-/**
- * This class solely exists to generate
- * javadocs for the "quickstart-java" project.
- **/
-public class Dummy {
-	//
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml
deleted file mode 100644
index 221c775..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/META-INF/maven/archetype.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
-  <id>flink-quickstart</id>
-  <sources>
-    <source>src/main/java/Job.java</source>
-    <source>src/main/java/WordCountJob.java</source>
-  </sources>
-</archetype>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
deleted file mode 100644
index 18d1b3b..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	
-	<groupId>${groupId}</groupId>
-	<artifactId>${artifactId}</artifactId>
-	<version>${version}</version>
-	<packaging>jar</packaging>
-
-	<name>Your Job's Name</name>
-	<url>http://www.myorganization.org</url>
-
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-	</properties>
-
-	<!-- These two requirements are the minimum to use and develop Flink. 
-		You can add others like <artifactId>pact-scala-core</artifactId> for Scala! -->
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>0.6-incubating-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>0.6-incubating-SNAPSHOT</version>
-		</dependency>
-	</dependencies>
-
-	<!-- We use the maven-jar-plugin to generate a runnable jar that you can 
-		submit to your Flink cluster. -->
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<version>2.4</version>
-				<configuration>
-					<archive>
-						<manifestEntries>
-							<program-class>${package}.Job</program-class>
-						</manifestEntries>
-					</archive>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
deleted file mode 100644
index 5a9b46f..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package ${package};
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Skeleton for a Flink Job.
- *
- * For a full example of a Flink Job, see the WordCountJob.java file in the
- * same package/directory or have a look at the website.
- *
- * You can also generate a .jar file that you can submit on your Flink
- * cluster.
- * Just type
- * 		mvn clean package
- * in the projects root directory.
- * You will find the jar in
- * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
- *
- */
-public class Job {
-
-	public static void main(String[] args) throws Exception {
-		// set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-
-		/**
-		 * Here, you can start creating your execution plan for Flink.
-		 *
-		 * Start with getting some data from the environment, like
-		 * 	env.readTextFile(textPath);
-		 *
-		 * then, transform the resulting DataSet<String> using operations
-		 * like
-		 * 	.filter()
-		 * 	.flatMap()
-		 * 	.join()
-		 * 	.group()
-		 * and many more.
-		 * Have a look at the programming guide for the Java API:
-		 *
-		 * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_guide.html
-		 *
-		 * and the examples
-		 *
-		 * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_examples.html
-		 *
-		 */
-
-		// execute program
-		env.execute("Stratosphere Java API Skeleton");
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java b/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
deleted file mode 100644
index 1ecd979..0000000
--- a/flink-quickstart/quickstart-java/src/main/resources/archetype-resources/src/main/java/WordCountJob.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package ${package};
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram
- * over some sample data
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>write a simple Flink program.
- * <li>use Tuple data types.
- * <li>write and use user-defined functions.
- * </ul>
- *
- */
-@SuppressWarnings("serial")
-public class WordCountJob {
-
-	//
-	//	Program.
-	//
-
-	public static void main(String[] args) throws Exception {
-
-		// set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataSet<String> text = env.fromElements(
-				"To be, or not to be,--that is the question:--",
-				"Whether 'tis nobler in the mind to suffer",
-				"The slings and arrows of outrageous fortune",
-				"Or to take arms against a sea of troubles,"
-				);
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new LineSplitter())
-				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.aggregate(Aggregations.SUM, 1);
-
-		// emit result
-		counts.print();
-
-		// execute program
-		env.execute("WordCount Example");
-	}
-
-	//
-	// 	User Functions
-	//
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a user-defined
-	 * FlatMapFunction. The function takes a line (String) and splits it into
-	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
-	 */
-	public static final class LineSplitter extends FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala-SNAPSHOT.sh
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala-SNAPSHOT.sh b/flink-quickstart/quickstart-scala-SNAPSHOT.sh
index 247e8b4..e8e1c58 100755
--- a/flink-quickstart/quickstart-scala-SNAPSHOT.sh
+++ b/flink-quickstart/quickstart-scala-SNAPSHOT.sh
@@ -23,7 +23,7 @@ PACKAGE=quickstart
 
 mvn archetype:generate								\
   -DarchetypeGroupId=org.apache.flink 				\
-  -DarchetypeArtifactId=quickstart-scala			\
+  -DarchetypeArtifactId=flink-quickstart-scala		\
   -DarchetypeVersion=0.6-incubating-SNAPSHOT		\
   -DgroupId=org.apache.flink						\
   -DartifactId=$PACKAGE								\

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/pom.xml b/flink-quickstart/quickstart-scala/pom.xml
deleted file mode 100644
index 030bd17..0000000
--- a/flink-quickstart/quickstart-scala/pom.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-
-  <parent>
-    <groupId>org.apache.flink</groupId>
-    <artifactId>flink-quickstart</artifactId>
-    <version>0.6-incubating-SNAPSHOT</version>
-    <relativePath>..</relativePath>
-  </parent>
-
-  <artifactId>quickstart-scala</artifactId>
-  <packaging>jar</packaging>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java b/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
deleted file mode 100644
index 0192712..0000000
--- a/flink-quickstart/quickstart-scala/src/main/java/org/apache/flink/quickstart/Dummy.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.quickstart;
-
-/**
- * This class solely exists to generate
- * javadocs for the "quickstart-java" project.
- **/
-public class Dummy {
-	//
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index 87162d7..0000000
--- a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,26 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<archetype-descriptor xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" name="prj-scala-only"
-    xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <fileSets>
-    <fileSet encoding="UTF-8" filtered="true" packaged="true">
-      <directory>src/main/scala</directory>
-      <includes>
-        <include>**/*.scala</include>
-      </includes>
-    </fileSet>
-  </fileSets>
-</archetype-descriptor>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml b/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
deleted file mode 100644
index 2b4bb84..0000000
--- a/flink-quickstart/quickstart-scala/src/main/resources/META-INF/maven/archetype.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-<archetype xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
-  <id>flink-quickstart-scala</id>
-  <sources>
-    <source>src/main/scala/Job.scala</source>
-  </sources>
-</archetype>
\ No newline at end of file