You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2014/11/14 15:34:09 UTC

[GitHub] incubator-flink pull request: [FLINK-337] [FLINK-671] Generic Inte...

GitHub user zentol opened a pull request:

    https://github.com/apache/incubator-flink/pull/202

    [FLINK-337] [FLINK-671] Generic Interface / PAPI

    This PR contains the new Generic Language Interface and the Python API built on top of it.
    
    This version hasn't been tested yet on a cluster, this will be done over the weekend. I'm making the PR already so that the reviewing portion starts earlier. (since only minor changes will be necessary to make it work)
    
    I will mark several parts where i specifically would like some input on.
    
    Relevant issues:
    Ideally, [FLINK-1040] will be merged before this is one, as it removes roughly 600 lines of very much hated code in the PlanBinder.
    
    A while ago the distributed cache was acting up, not maintaining files across subsequent operations. I will verify whether this issue still exists while testing. That would not strictly be a blocking issue, as it stands i could work around that (with the caveat that a few files will remain in the tmp folder).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/incubator-flink papipr

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #202
    
----
commit 1db16983e2fa784c7fd3ab3e29e32edcf271de7b
Author: zentol <s....@web.de>
Date:   2014-11-14T12:37:24Z

    [FLINK-337] Generic Language Interface

commit f5b10f4fe5c0d78c89ff177808aec3a0d0487489
Author: zentol <s....@web.de>
Date:   2014-11-14T12:39:00Z

    [FLINK-671] Python API

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63969320
  
    @rmetzger I added a commit that resolves the issues in the documentation/examples/absoulute filepaths.
    HDFS is still hardcoded though, i can't find any documentation for a "fs.defaultFS" parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63098715
  
    Wow, this is massive and cool :-)
    
    Can you explain a bit what the generic interface is and how python plug into that exactly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63110487
  
    ok, let's see...
    
    A) data transmission.
    
    Related class: Sender, Receiver and Streamer
    Sender and Receiver are low-level classes that deal with de-/serialization and reading/writing to/from the memory-mapped file.
    The Streamer class is one level above them, being the class a function will primarily uses.
    
    For example a map function (implemented as a map partition) would use it like this:
    
    @Override
    public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception {
    	streamer.streamBufferWithoutGroups(values.iterator(), out);
    }
    
    As such the streamer basically decides what happens at runtime.
    It communicates with the external process via udp, exchanging signals when to read/write buffers.
    
    B) Plan Binding
    
    Related classes: PlanBinder, OperationInfo
    
    The PlanBinder converts data sent from the external process into an actual flink plan. The sent data has to follow a very specific scheme defined within the class.
    Lets say you have a plan with a single CSV source. In this case for the sources you would 
     1. send 1 as an integer, representing the total number of sources
     2. send "CSV" denoting the type of source
     3. send a unique ID integer
     4. send a tuple containing (filePath,lineDelimiter,fieldDelimiter,type1,...,typeX)
    	the type arguments are exemplary objects, meaning that if you want to read Tuple2<Integer,Integer>, type1 and type2 would be integers
    
    These behaviours are completely defined for 
    	parameters (DOP, local execution)
    	data sources (CSV,TEXT,VALUE (fromElements())
    	data sinks (CSV,TEXT,PRINT)
    	broadcast variables
    
    operations are a bit different; but they follow the same pattern:
     1. send integer representing the total number of operations
     2. send operation identifier
     3. send all relevant arguments (this is partially implementee defined)
    
    between step 2(exclusive) and 3(inclusive) the following methods are called:
    createOperationInfo(identifier) (abstract)
    createXOperation(operationInfo)
    applyXOperation(parameter1,..., paremeterX, operationInfo) (abstract, only for UDF-operations)
    
    createOperationInfo is an abstract method with the purpose of creating an <? extends operationInfo> object
    containing all information necessary to apply an (UDF or non-UDF) operation.
    For a python map for example, such an object would contain 
     1. ID of set to apply map on
     2 serialized operator
     3. type argument
     4. some additional info :>
    
    This object is passed to createXOperation. It extracts certain parameters from the object based on the identifier(these would be things that you usually would use for this operation, a join would extract grouping keys among others for example)
    and calls applyXOperation, which actually applies the operation. It was constructed this way so that the implementee has the freedom to implement operations however he wants, like a map as a map partition. I was wondering whether i should remove createXOperation.
    
    To plug in python i had to very little on the java side:
     1. create functions (PythonMap, PythonCoGroup etc)
     2. extend Streamer and implement setupProcess (starting python process)
     3. extend OperationInfo (roughly 160 lines, given 16 operations thats about 10 each)
     4. extend PlanBinder (the abstract methods only took 100 lines)
    
    On the external side you mostly need code to 
     1. communicate with the stream (Connection.py)
     2. deserialize data following 3 different protocols(Iterator.py)
      1. <type1><record1><type2><record2>...<typeX><recordX>
      2. <type><record1><record2>...<recordX>
      3. <type1><type2><record1T1><record2T2><record3T1><record4T2>
     3. serialize data following 2 different protocols(Collector.py)
      1. <type1><record1><type2><record2>...<typeX><recordX>
      2. <type><record1><record2>...<recordX>
     4. PlanBinder counterpart, turn API calls into intermediate format and send it to java
    
    hmm...I'll leave it at that for now. If i keep writing too many in depth implementation details will slip in ^^
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20676920
  
    --- Diff: docs/python_api_guide.md ---
    @@ -0,0 +1,666 @@
    +--- 
    +title: Python Programming Guide
    +---
    +
    +
    +Python Programming Guide
    --- End diff --
    
    Is this not enough?
    
    https://github.com/zentol/incubator-flink/blob/papipr/docs/python_api_guide.md#executing-jobs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20676745
  
    --- Diff: docs/python_api_guide.md ---
    @@ -0,0 +1,666 @@
    +--- 
    +title: Python Programming Guide
    +---
    +
    +
    +Python Programming Guide
    --- End diff --
    
    I think a paragraph on how to execute a Flink python program would be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-337] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20361477
  
    --- Diff: flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java ---
    @@ -0,0 +1,376 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
    + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
    + * License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + */
    +package org.apache.flink.languagebinding.api.java.python;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.operators.GroupReduceOperator;
    +import org.apache.flink.api.java.operators.SortedGrouping;
    +import org.apache.flink.api.java.operators.UnsortedGrouping;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.languagebinding.api.java.common.PlanBinder;
    +import org.apache.flink.languagebinding.api.java.common.OperationInfo;
    +import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo;
    +//CHECKSTYLE.OFF: AvoidStarImport - enum/function import
    +import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*;
    +import org.apache.flink.languagebinding.api.java.python.functions.*;
    +//CHECKSTYLE.ON: AvoidStarImport
    +import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
    +import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
    +import org.apache.flink.runtime.filecache.FileCache;
    +
    +/**
    + * This class allows the execution of a Flink plan written in python.
    + */
    +public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
    +	public static final String FLINK_PYTHON_ID = "flink";
    +	public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
    +	public static final String FLINK_PYTHON_EXECUTOR_NAME = "/executor.py";
    +
    +	private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + "/flink_plan";
    +	private static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python";
    +	private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
    +
    +	private Process process;
    +
    +	/**
    +	 * Entry point for the execution of a python plan.
    +	 *
    +	 * @param args planPath [package1[packageX[|parameter1[parameterX]]]]
    +	 * @throws Exception
    +	 */
    +	public static void main(String[] args) throws Exception {
    +		PythonPlanBinder binder = new PythonPlanBinder();
    +		binder.go(args);
    +	}
    +
    +	private void go(String[] args) throws Exception {
    +		env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		int split = 0;
    +		for (int x = 0; x < args.length; x++) {
    +			if (args[x].compareTo("|") == 0) {
    +				split = x;
    +			}
    +		}
    +
    +		prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
    +		startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
    +		receivePlan();
    +		distributeFiles(env);
    +
    +		env.execute();
    +		close();
    +	}
    +
    +	//=====Setup========================================================================================================
    +	private void prepareFiles(String... filePaths) throws IOException, URISyntaxException {
    +		prepareFlinkPythonPackage();
    +
    +		String planPath = filePaths[0];
    +		if (planPath.endsWith("/")) {
    +			planPath = planPath.substring(0, planPath.length() - 1);
    +		}
    +		String tmpPlanPath = FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME;
    +		clearPath(tmpPlanPath);
    +		FileCache.copy(new Path(planPath), new Path(tmpPlanPath), false);
    +
    +		for (int x = 1; x < filePaths.length; x++) {
    +			copyFile(filePaths[x]);
    +		}
    +	}
    +
    +	private void startPython(String[] args) throws IOException {
    +		sets = new HashMap();
    +		StringBuilder argsBuilder = new StringBuilder();
    +		for (String arg : args) {
    +			argsBuilder.append(" ").append(arg);
    +		}
    +		receiver = new Receiver(null);
    +		receiver.open(null);
    +		process = Runtime.getRuntime().exec("python -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
    +
    +		new StreamPrinter(process.getInputStream()).start();
    +		new StreamPrinter(process.getErrorStream()).start();
    +	}
    +
    +	private void close() throws IOException, URISyntaxException {
    +		FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH));
    +		hdfs.delete(new Path(FLINK_HDFS_PATH), true);
    +
    +		FileSystem local = FileSystem.getLocalFileSystem();
    +		local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
    +		local.delete(new Path(FLINK_TMP_DATA_DIR), true);
    +
    +		try {
    +			receiver.close();
    +		} catch (NullPointerException npe) {
    +		}
    +		process.destroy();
    +	}
    +
    +	public static void prepareFlinkPythonPackage() throws IOException, URISyntaxException {
    +		String originalFilePath = FLINK_DIR.substring(0, FLINK_DIR.length() - 7) + FLINK_PYTHON_REL_LOCAL_PATH;
    +		String tempFilePath = FLINK_PYTHON_FILE_PATH;
    +		clearPath(tempFilePath);
    +		FileCache.copy(new Path(originalFilePath), new Path(tempFilePath), false);
    +	}
    +
    +	public static void prepareFlinkPythonPackage(String path) throws IOException {
    +		FileCache.copy(new Path(path), new Path(FLINK_PYTHON_FILE_PATH), true);
    +	}
    +
    +	public static void distributeFiles(ExecutionEnvironment env) throws IOException, URISyntaxException {
    +		clearPath(FLINK_HDFS_PATH);
    +		FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new Path(FLINK_HDFS_PATH), true);
    +		env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_ID);
    +		clearPath(FLINK_PYTHON_FILE_PATH);
    +	}
    +
    +	private static void clearPath(String path) throws IOException, URISyntaxException {
    +		FileSystem fs = FileSystem.get(new URI(path));
    +		if (fs.exists(new Path(path))) {
    +			fs.delete(new Path(path), true);
    +		}
    +	}
    +
    +	public static String copyFile(String path) throws IOException, URISyntaxException {
    +		if (path.endsWith("/")) {
    +			path = path.substring(0, path.length() - 1);
    +		}
    +		String identifier = path.substring(path.lastIndexOf("/"));
    +		String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
    +		clearPath(tmpFilePath);
    +		FileCache.copy(new Path(path), new Path(tmpFilePath), true);
    +		return identifier;
    +	}
    +
    +	//=====Plan Binding=================================================================================================
    +	protected class PythonOperationInfo extends OperationInfo {
    +		protected static final int INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED = -1;
    +		protected static final int INFO_MODE_UDF_DOUBLE_KEYED_TYPED = 0;
    +		protected static final int INFO_MODE_UDF_DOUBLE_TYPED = 1;
    +		protected static final int INFO_MODE_UDF_SINGLE_TYPED = 2;
    +		protected static final int INFO_MODE_UDF_SINGLE_TYPED_COMBINE = 9;
    +		protected static final int INFO_MODE_UDF = 3;
    +		protected static final int INFO_MODE_GROUP = 4;
    +		protected static final int INFO_MODE_SORT = 5;
    +		protected static final int INFO_MODE_UNION = 6;
    +		protected static final int INFO_MODE_PROJECT = 7;
    +		protected static final int INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED = 8;
    +		protected String operator;
    +		protected String meta;
    +		protected boolean combine;
    +
    +		protected PythonOperationInfo(int mode) throws IOException {
    +			parentID = (Integer) receiver.getRecord();
    +			childID = (Integer) receiver.getRecord();
    +			switch (mode) {
    +				case INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED:
    +					keys1 = (Tuple) receiver.getRecord();
    +					keys2 = (Tuple) receiver.getRecord();
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					projectionKeys1 = (Tuple) receiver.getRecord();
    +					projectionKeys2 = (Tuple) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED:
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					projectionKeys1 = (Tuple) receiver.getRecord();
    +					projectionKeys2 = (Tuple) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_DOUBLE_KEYED_TYPED:
    +					keys1 = (Tuple) receiver.getRecord();
    +					keys2 = (Tuple) receiver.getRecord();
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_DOUBLE_TYPED:
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_SINGLE_TYPED:
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_SINGLE_TYPED_COMBINE:
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					combine = (Boolean) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF:
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_GROUP:
    +					keys1 = (Tuple) receiver.getRecord();
    +					break;
    +				case INFO_MODE_SORT:
    +					field = (Integer) receiver.getRecord();
    +					order = (Integer) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UNION:
    +					otherID = (Integer) receiver.getRecord();
    +					break;
    +				case INFO_MODE_PROJECT:
    +					keys1 = (Tuple) receiver.getRecord();
    +					types = receiver.getRecord();
    +					break;
    +			}
    +		}
    +	}
    +
    +	@Override
    +	protected PythonOperationInfo createOperationInfo(String identifier) throws IOException {
    +		switch (Operations.valueOf(identifier.toUpperCase())) {
    +			case COGROUP:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +			case CROSS:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +			case CROSS_H:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +			case CROSS_T:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +			case FILTER:
    +				return new PythonOperationInfo(INFO_MODE_UDF);
    +			case FLATMAP:
    +				return new PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +			case GROUPREDUCE:
    +				return new PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED_COMBINE);
    +			case JOIN:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +			case JOIN_H:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +			case JOIN_T:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +			case MAP:
    +				return new PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +			case PROJECTION:
    +				return new PythonOperationInfo(INFO_MODE_PROJECT);
    +			case REDUCE:
    +				return new PythonOperationInfo(INFO_MODE_UDF);
    +			case GROUPBY:
    +				return new PythonOperationInfo(INFO_MODE_GROUP);
    +			case SORT:
    +				return new PythonOperationInfo(INFO_MODE_SORT);
    +			case UNION:
    +				return new PythonOperationInfo(INFO_MODE_UNION);
    +		}
    +		return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +	}
    +
    +	@Override
    +	protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, int[] firstKeys, int[] secondKeys, PythonOperationInfo info) {
    +		return op1.coGroup(op2).where(firstKeys).equalTo(secondKeys).with(new PythonCoGroup(info.operator, info.types, info.meta));
    +	}
    +
    +	public static class PseudoKeySelector<X> implements KeySelector<X, Integer> {
    +		@Override
    +		public Integer getKey(X value) throws Exception {
    +			return 0;
    +		}
    +	}
    +
    +	@Override
    +	protected DataSet applyCrossOperation(DataSet op1, DataSet op2, int mode, PythonOperationInfo info) {
    +		switch (mode) {
    +			case 0:
    +				return op1.join(op2).where(new PseudoKeySelector()).equalTo(new PseudoKeySelector()).with(new PythonCross(info.operator, info.types, info.meta));
    --- End diff --
    
    A Cross is implemented as a join where every pair matches. I don't know the implications of doing it in such a hacky way. (The comparison overhead should be negligible considering the current performance.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63971965
  
    I think if you have a flink.Path() object, you can call getFileSystem() or so on it.. The FileSystem class you're getting is abstracting from the actual file system (remote, local).
    If you are not adding a scheme prefix (file:/// or hdfs:///) it will automatically use the local file system. I think the Path() thing is also able to handle relative paths. 
    Maybe its a good idea to put these "temp" files into the home directory of the user (".flink/python"). The YARN client is also doing it this way. You should make sure to clean everything up afterwards.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63883849
  
    example: to run it locally, modify `env.execute()` to `env.execute(local=True)`.
    you are correct that hdfs is hardcoded for cluster execution currently. i didn't know there was such a fancy parameter. when you run something locally, does it automatically point to the local filesystem? that would make the above modification unnecessary, which would be REALLY nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20676686
  
    --- Diff: docs/python_api_guide.md ---
    @@ -0,0 +1,666 @@
    +--- 
    +title: Python Programming Guide
    --- End diff --
    
    I think that's certainly a long term goal. There are probably bigger issues right now that we need to resolve first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20677094
  
    --- Diff: docs/python_api_guide.md ---
    @@ -0,0 +1,666 @@
    +--- 
    +title: Python Programming Guide
    +---
    +
    +
    +Python Programming Guide
    --- End diff --
    
    Ah, I didn't see this paragraph.
    I did expect this a bit earlier ;)
    
    Maybe you can add a very quick comment after the initial wordcount example so that users understand how to execute it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63985109
  
    if i do this:
    ```
    Path relativePath = new Path(<relativePathAsString>);
    Path absolutePath = relativePath .makeQualified(relativePath.getFileSystem());
    ```
    i get an exception saying that get() requires an absolute path. I'm preparing a PR to fix that.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63879867
  
    I copied the wordcount example into a file, trying to run it:
    ```
    robert@robert-tower ...HOT-bin/flink-0.8-incubating-SNAPSHOT (git)-[papipr] % ll
    total 84
    drwxr-xr-x 2 robert robert  4096 Nov 20 21:59 bin/
    drwxr-xr-x 2 robert robert  4096 Nov 20 21:59 conf/
    -rw-r--r-- 1 robert robert   539 Nov 20 21:59 DISCLAIMER
    drwxr-xr-x 3 robert robert  4096 Nov 20 21:59 examples/
    drwxr-xr-x 2 robert robert  4096 Nov 20 21:59 lib/
    -rw-r--r-- 1 robert robert 23843 Nov 20 21:59 LICENSE
    drwxr-xr-x 2 robert robert  4096 Nov 20 22:02 log/
    -rw-r--r-- 1 robert robert 19706 Nov 20 21:59 NOTICE
    -rw-r--r-- 1 robert robert  1348 Nov 20 21:59 README.txt
    drwxr-xr-x 5 robert robert  4096 Nov 20 21:59 resources/
    drwxr-xr-x 2 robert robert  4096 Nov 20 21:59 tools/
    -rw-r--r-- 1 robert robert   928 Nov 20 22:02 wc.py
    robert@robert-tower ...HOT-bin/flink-0.8-incubating-SNAPSHOT (git)-[papipr] % ./bin/pyflink.sh -v  wc.py
    Error: The main method caused an error.
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:445)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:346)
    	at org.apache.flink.client.program.Client.run(Client.java:244)
    	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
    Caused by: java.io.IOException: The file URI 'wc.py' is not valid.  File URIs need to specify aboslute file paths.
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:212)
    	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:299)
    	at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:118)
    	at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.prepareFiles(PythonPlanBinder.java:94)
    	at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.go(PythonPlanBinder.java:75)
    	at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.main(PythonPlanBinder.java:62)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:483)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:430)
    	... 6 more
    ```
    
    It would be convenient for users if the program would not expect an absolute path.
    
    2. Error
    ```
    ./bin/pyflink.sh -v  /home/robert/incubator-flink/flink-dist/target/flink-0.8-incubating-SNAPSHOT-bin/flink-0.8-incubating-SNAPSHOT/wc.py
    StreamPrinter: Traceback (most recent call last):
    StreamPrinter:   File "/tmp/flink_plan/flink/connection/Connection.py", line 24, in <module>
    StreamPrinter:     import cStringIO
    StreamPrinter: ImportError: No module named 'cStringIO'
    StreamPrinter: 
    StreamPrinter: During handling of the above exception, another exception occurred:
    StreamPrinter: 
    StreamPrinter: Traceback (most recent call last):
    StreamPrinter:   File "/tmp/flink_plan/plan.py", line 1, in <module>
    StreamPrinter:     from flink.plan.Environment import get_environment
    StreamPrinter:   File "/tmp/flink_plan/flink/plan/Environment.py", line 18, in <module>
    StreamPrinter:     from flink.connection import Connection
    StreamPrinter:   File "/tmp/flink_plan/flink/connection/Connection.py", line 28, in <module>
    StreamPrinter:     import StringIO
    StreamPrinter: ImportError: No module named 'StringIO'
    ```
    
    Maybe we need a section that describes how to install the dependencies? (or at least list them)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63880657
  
    I think the issue is that my linux distribution is using Python3 by default. From the source code it seems that you're just calling the "python" binary.
    Maybe we need to make this more flexible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-337] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20361957
  
    --- Diff: docs/python_api_guide.md ---
    @@ -0,0 +1,666 @@
    +--- 
    +title: Python Programming Guide
    --- End diff --
    
    The documentation is not integrated into the current one for Java&Scala. Should i (try to...) do this know or postpone it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63872922
  
    I think it would be pretty bad, if we end up having an API that is a lot! slower than our other APIs without any potential for improvement and need to keep the API because somebody is using it.
    However, if we are confident that the performance can be improved, I am definitely in favor of adding it early to encourage people of using and improving it.
    
    So I am +1 for merging it, if the PR is (slowly but) correctly working and we are optimistic that performance can be tuned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63859374
  
    damn it Robert, why are you making me rethink my PR? :(
    
    i see your point. Maybe I'm oversimplifying things, but my conclusion is that if we want to prevent people from dismissing the Python API completely due to premature assessments we end up in a catch-up game we cant win. Next time it's gonna be missing feature X that's causing it, and after that is fixed that we may end up talking about feature Y or yet again about performance.
    
    My other major gripe is that there are things that people could work on while we fix the performance. The PlanBinding stuff is completely irrelevant to performance. So is Windows support (which surprisingly enough is not just about paths O.o). I can surely find more things if i wanted to. 
    these things will fall flat when we don't merge it, since creating issues (and resolving them), while the code is not in the main project seems a bit iffy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63882979
  
    path issue is fixed, but I'm bundling the changes before making another commit here.
    
    I could have sworn i wrote into the documentation that it doesn't run under python 3...i know it in there once.
    
    anyway, under python 3 StringIO is part pf the io package, whereas in previous versions it was its own, so a simple use-python-version-xy when starting the process wont work. it will require more work; determine the used pyhton version and execute certain code based on that.
    there are other issues with python3 aswell (like checking classes for long and strings).
    
    btw.: apart from having python 2.7 (and possibly lower) , no additional packages are required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63882316
  
    With the fixed python call, I'm getting from the example.
    ```
    StreamPrinter: Traceback (most recent call last):
    StreamPrinter:   File "/tmp/flink_plan/plan.py", line 26, in <module>
    StreamPrinter:     data = env.read_text(textfile)
    StreamPrinter: NameError: name 'textfile' is not defined
    ```
    
    Would be cool to have a) an included data set b) let the user pass the path's from the command line.
    
    Once I've fixed that, I'm getting another error:
    ```
    Error: The main method caused an error.
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:445)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:346)
    	at org.apache.flink.client.program.Client.run(Client.java:244)
    	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
    Caused by: java.io.IOException: The given HDFS file URI (hdfs:/tmp/flink) did not describe the HDFS Namenode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default hdfs configuration was registered, or the provided configuration contains no valid hdfs namenode address (fs.default.name or fs.defaultFS) describing the hdfs namenode host and port.
    	at org.apache.flink.runtime.fs.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:281)
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:255)
    	at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.clearPath(PythonPlanBinder.java:149)
    	at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.distributeFiles(PythonPlanBinder.java:142)
    	at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.go(PythonPlanBinder.java:78)
    	at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.main(PythonPlanBinder.java:62)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:483)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:430)
    	... 6 more
    ```
    
    It seems that HDFS is hardcoded. It would be better to just use the "fs.defaultFS" parameter to let the user decide on the file system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63817898
  
    I know from personal conversations that there are some performance issues with the Python API.
    
    How are we going to proceed with this change? Are we going to merge it with a big "BETA / preview" disclaimer or are we going to fix the performance issues first?
    
    I'm a bit undecided on this. Merging it early would allow us to assess if users are interested in such a feature. On the other hand, people will probably ignore the disclaimer and start benchmarking the python API.
    By merging it early, we see if people from the community are picking it up to develop it further. I don't know.
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63980978
  
     1. All files will be removed even if the job fails. (on workernodes, master comes with next commit)
     2. Why should they be moved from /tmp to the home directory? the distCache puts files in /tmp, I'd rather have them all in roughly the same place.
     3. No tests.
     4. instead of making it a configurable value, let's just fix the code to work under python 3. all relevant code bits are in the setup stage, as in performance unrelated.
     5. StringIO import is actually unused, look at that ^^
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63098764
  
    That will make it easier to look through the code and give comments...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-337] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20361989
  
    --- Diff: flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/examples/java/python/wordcount/WordCount.java ---
    @@ -0,0 +1,47 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
    + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
    + * License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + */
    +package org.apache.flink.languagebinding.examples.java.python.wordcount;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.operators.GroupReduceOperator;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import static org.apache.flink.languagebinding.api.java.common.PlanBinder.INT;
    +import static org.apache.flink.languagebinding.api.java.common.PlanBinder.STRING;
    +import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder;
    +import org.apache.flink.languagebinding.api.java.python.functions.PythonFlatMap;
    +import org.apache.flink.languagebinding.api.java.python.functions.PythonGroupReduce;
    +
    +public class WordCount {
    +	public static void main(String[] args) throws Exception {
    --- End diff --
    
    Will be changed to be inline with other examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-377] [FLINK-671] Generic Inte...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63972208
  
    Regarding Python 2 and Python 3. My operating system is using python3 if you are calling "python". I think it is really necessary to turn this into a configurable value! I have to call "python2.7" to make it working.
    
    
    Are there any tests included into the Python API?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-337] [FLINK-671] Generic Inte...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20361691
  
    --- Diff: flink-dist/src/main/flink-bin/bin/flink ---
    @@ -48,6 +48,7 @@ CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
     log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-flink-client-$HOSTNAME.log
     log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
     
    +export FLINK_ROOT_DIR
    --- End diff --
    
    I've been unsure about this change for a long time, wondering whether i should instead export only a specific directory. The python package currently resides in /resource/python, is there a more appropriate place to put it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---