You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fpompermaier <gi...@git.apache.org> on 2014/10/31 17:50:02 UTC

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

GitHub user fpompermaier opened a pull request:

    https://github.com/apache/incubator-flink/pull/172

    Upgraded APIs: Flink (0.8) and HBase (0.98.x) + profiles

    I had to create an hadoop.core.version variable to manage the fact that cloudera has different versions for hadoop-core and hadoop-common..

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fpompermaier/incubator-flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/172.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #172
    
----
commit 78f7c1d84a4edff445c10a60bb94bbe5c6184580
Author: fpompermaier <f....@gmail.com>
Date:   2014-10-31T16:43:58Z

    Upgraded APIs: Flink (0.8) and HBase (0.98.x) + profiles

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-63456042
  
    Before that we have to discuss about how to properly handle the problem of
    the initialization of HTable and Scan object within the configure method.
    We can do it this evening at the meeting or tomorrow during the day, ok?
    
    Best,
    Flavio
    On Nov 18, 2014 12:12 PM, "Stephan Ewen" <no...@github.com> wrote:
    
    > Before we can merge this, can you rebase and squash the commits? That
    > should also eliminate all the changes that are already in the master.
    >
    > Here is a brief guide how to do that:
    > http://flink.incubator.apache.org/how-to-contribute.html
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/incubator-flink/pull/172#issuecomment-63454928>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19776871
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    That worked like a charm :)
    So now the only thing left out is the GenericTableOutputFormat..does it need to implement  OutputFormat\<Tuple2\<ImmutableBytesWritable, Result\>\>? In that case, how could writeRecord(Tuple2\<ImmutableBytesWritable, Result\> record) be implemented?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-62414461
  
    Commit 0c8de722e9230180b7f881d411e56e8a4ec0fe45 in this pull request seems to contain a lot of changes already merged into master?
    Github shows me that the PR is touching more than 5k lines of code? Maybe a rebase or merge went wrong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19743935
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    Yes, probably there's something not handled properly.
    However Result in this case is of type org.apache.hadoop.hbase.client.Result..is this the cause of the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-62183763
  
    Do you think this PR is now ready to be merged? Or are you going to fix Flink serialization of GenericTypes before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20090518
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -19,186 +19,72 @@
     
     package org.apache.flink.addons.hbase;
    --- End diff --
    
    Which permissions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728622
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20077721
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -1,22 +1,14 @@
     <?xml version="1.0" encoding="UTF-8"?>
    -<!--
    -Licensed to the Apache Software Foundation (ASF) under one
    -or more contributor license agreements.  See the NOTICE file
    -distributed with this work for additional information
    -regarding copyright ownership.  The ASF licenses this file
    -to you under the Apache License, Version 2.0 (the
    -"License"); you may not use this file except in compliance
    -with the License.  You may obtain a copy of the License at
    -
    -  http://www.apache.org/licenses/LICENSE-2.0
    -
    -Unless required by applicable law or agreed to in writing,
    -software distributed under the License is distributed on an
    -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    -KIND, either express or implied.  See the License for the
    -specific language governing permissions and limitations
    -under the License.
    --->
    +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
    +	license agreements. See the NOTICE file distributed with this work for additional 
    +	information regarding copyright ownership. The ASF licenses this file to 
    +	you under the Apache License, Version 2.0 (the "License"); you may not use 
    +	this file except in compliance with the License. You may obtain a copy of 
    +	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
    +	by applicable law or agreed to in writing, software distributed under the 
    +	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
    +	OF ANY KIND, either express or implied. See the License for the specific 
    +	language governing permissions and limitations under the License. -->
    --- End diff --
    
    The comment looks truncated by an autoformatter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725696
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    --- End diff --
    
    The ``minNumSplits`` parameter is not considered when creating the splits. This should not cause the execution to fail but some data sources might end up with no input data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20093842
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -116,20 +109,74 @@ under the License.
     			</exclusions>
     		</dependency>
     	</dependencies>
    -		<!-- <dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-server</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		<dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-client</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		 -->
     
    -	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
    -		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
    -		for description of hadoop-clients -->
    +	<profiles>
    +		<profile>
    +			<id>hadoop-1</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop1 -->
    +					<name>!hadoop.profile</name>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop1.version}</hbase.version>
    +			</properties>
    +		</profile>
    +		<profile>
    +			<id>hadoop-2</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop2 -->
    +					<name>hadoop.profile</name>
    +					<value>2</value>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop2.version}</hbase.version>
    +			</properties>
    +			<dependencies>
    +				<!-- Force hadoop-common dependency -->
    +				<dependency>
    +					<groupId>org.apache.hadoop</groupId>
    +					<artifactId>hadoop-common</artifactId>
    +				</dependency>
    +			</dependencies>
    +		</profile>
    +		<profile>
    +			<id>cdh5.1.3</id>
    --- End diff --
    
    What I dislike about the profile is that its very specific about the version. We basically need to manually maintain the CDH versions and force users into specific CDH versions.
    
    Would it be possible to add the `<dependencyManagement>` section with the hadoop-core dependency into the `hadoop2` profile and set the hadoop.core.version to hadoop-2.2.0 by default? 
    This way users could actually specify their specific hadoop versions if they want to build flink against a particular CDH build?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20078222
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -1,22 +1,14 @@
     <?xml version="1.0" encoding="UTF-8"?>
    -<!--
    -Licensed to the Apache Software Foundation (ASF) under one
    -or more contributor license agreements.  See the NOTICE file
    -distributed with this work for additional information
    -regarding copyright ownership.  The ASF licenses this file
    -to you under the Apache License, Version 2.0 (the
    -"License"); you may not use this file except in compliance
    -with the License.  You may obtain a copy of the License at
    -
    -  http://www.apache.org/licenses/LICENSE-2.0
    -
    -Unless required by applicable law or agreed to in writing,
    -software distributed under the License is distributed on an
    -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    -KIND, either express or implied.  See the License for the
    -specific language governing permissions and limitations
    -under the License.
    --->
    +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
    +	license agreements. See the NOTICE file distributed with this work for additional 
    +	information regarding copyright ownership. The ASF licenses this file to 
    +	you under the Apache License, Version 2.0 (the "License"); you may not use 
    +	this file except in compliance with the License. You may obtain a copy of 
    +	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
    +	by applicable law or agreed to in writing, software distributed under the 
    +	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
    +	OF ANY KIND, either express or implied. See the License for the specific 
    +	language governing permissions and limitations under the License. -->
    --- End diff --
    
    Ok, now I should have fixed that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728778
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -179,26 +155,13 @@ protected Scan createScanner(Configuration parameters) {
     	 *        a {@link Configuration} that holds at least the table name.
     	 */
     	protected HTable createTable(Configuration parameters) {
    -		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
    -		LOG.info("Got config location: " + configLocation);
    -		if (configLocation != null)
    -		{
    -			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
    -			if(OperatingSystem.isWindows()) {
    -				dummyConf.addResource(new Path("file:/" + configLocation));
    -			} else {
    -				dummyConf.addResource(new Path("file://" + configLocation));
    -			}
    -			hConf = HBaseConfiguration.create(dummyConf);
    -			;
    -			// hConf.set("hbase.master", "im1a5.internetmemory.org");
    -			LOG.info("hbase master: " + hConf.get("hbase.master"));
    -			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
    -
    -		}
    +		LOG.info("Initializing HBaseConfiguration");
    +		//use files found in the classpath
    +		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
    --- End diff --
    
    This is what happens normally when you submit a mapreduce job..you include the *-site.xml files in it and then the magic occurs :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20091789
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -19,186 +19,72 @@
     
     package org.apache.flink.addons.hbase;
    --- End diff --
    
    The linux file permissions.
    So github says in the changed file view that the permissions of the file were changed from 644 to 755 (it says "100644 → 100755" in the box's header)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725598
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -329,9 +279,9 @@ public void open(TableInputSplit split) throws IOException {
     				continue;
     			}
     
    -			final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
    -			final byte[] startRow = this.scan.getStartRow();
    -			final byte[] stopRow = this.scan.getStopRow();
    +			final String regionLocation = table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
    +			final byte[] startRow = scan.getStartRow();
    --- End diff --
    
    ``startRow`` and ``stopRow`` do not change in the loop right? 
    So these assignements could be moved outside of the loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728304
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    --- End diff --
    
    ok..restored checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728658
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -59,11 +64,18 @@ public String getStringData() {
     	
     	@Override
     	public void read(DataInputView in) throws IOException {
    -		this.result.readFields(in);
    +		Deserializer<Result> deser = new ResultSerialization().getDeserializer(Result.class);
    +		deser.open(new DataInputViewStream(in));
    +		result = deser.deserialize(null);
    +		deser.close();
     	}
     	
     	@Override
     	public void write(DataOutputView out) throws IOException {
    -		this.result.write(out);	
    +		Serializer<Result> ser = new ResultSerialization().getSerializer(Result.class);
    --- End diff --
    
    So what should I do here? Istantiate a field Serializer in the class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20094611
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -19,186 +19,72 @@
     
     package org.apache.flink.addons.hbase;
    --- End diff --
    
    Thas was unwanted.. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19732516
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    +		HTable table = createTable(parameters);
     
    -		if (this.table == null) {
    -			throw new IOException("No table was provided.");
    -		}
    -
    -		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
    -
    +		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
    -
     			throw new IOException("Expecting at least one region.");
     		}
    +		Scan scan = createScanner(parameters);
     		int count = 0;
     		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
    --- End diff --
    
    Do you mean I just have to write:
    List<TableInputSplit> splits = new ArrayList<TableInputSplit>();
    ?
    In that case there are no problem


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19738715
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    Hmm, OK. Avro requires a default constructor which ``Cell`` does not provide.
    So Avro Serialization does not work :-(
    
    Why did you change the logic anyway? Is Result no longer Serializable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19745184
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    I had a look at the outdated API. It seems that ``Result`` does no longer implement the Hadoop Writable interface. Instead, they do their serialization with ProtoBuf... I haven't figured out, how they tell Hadoop how to serialize Result objects.
    
    Can you post the complete Stack trace of the ``NoSuchMethodException``?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19830452
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    Hmm, that sounds like a bug in Flink. 
    The ``Configuration`` passed to ``InputFormats`` in ``configure()`` should not be empty.
    Do you want to create a JIRA issue for the bug you found?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-63454928
  
    Before we can merge this, can you rebase and squash the commits? That should also eliminate all the changes that are already in the master.
    
    Here is a brief guide how to do that: http://flink.incubator.apache.org/how-to-contribute.html
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier closed the pull request at:

    https://github.com/apache/incubator-flink/pull/172


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728805
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -59,11 +64,18 @@ public String getStringData() {
     	
     	@Override
     	public void read(DataInputView in) throws IOException {
    -		this.result.readFields(in);
    +		Deserializer<Result> deser = new ResultSerialization().getDeserializer(Result.class);
    +		deser.open(new DataInputViewStream(in));
    +		result = deser.deserialize(null);
    +		deser.close();
     	}
     	
     	@Override
     	public void write(DataOutputView out) throws IOException {
    -		this.result.write(out);	
    +		Serializer<Result> ser = new ResultSerialization().getSerializer(Result.class);
    --- End diff --
    
    I hope you do not need the wrapper class anymore. As I said, the system should be able to de/serialize any class. 
    
    If you change the bindings of the HBaseInputFormat from ``Record`` to ``Tuple2<ImmutableBytesWritable, Result>`` then there is no need to wrap ``ImmutableBytesWritable`` and ``Result``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19844652
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    No, haven't tried to reproduce it. 
    Is the Configuration object empty (no keys and values) or is the object just null?
    I can also open an issue, just wanted to make sure that we do not forget about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725947
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -45,6 +40,8 @@
     import org.apache.hadoop.hbase.util.Bytes;
     import org.apache.hadoop.hbase.util.Pair;
     import org.apache.hadoop.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     /**
      * {@link InputFormat} subclass that wraps the access for HTables.
    --- End diff --
    
    The InputFormat is still typed for the deprecated Java Record API (it uses the Record class).
    To port it to the new API, it should be changed to
    ``public class TableInputFormat implements InputFormat<Tuple2<ImmutableBytesWritable, Result>, TableInputSplit>``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728917
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -59,11 +64,18 @@ public String getStringData() {
     	
     	@Override
     	public void read(DataInputView in) throws IOException {
    -		this.result.readFields(in);
    +		Deserializer<Result> deser = new ResultSerialization().getDeserializer(Result.class);
    --- End diff --
    
    Ok..now I got it: changing from Result to Tuple does not require HBaseResult and HBaseKey anymore :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20089597
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -116,20 +109,74 @@ under the License.
     			</exclusions>
     		</dependency>
     	</dependencies>
    -		<!-- <dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-server</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		<dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-client</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		 -->
     
    -	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
    -		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
    -		for description of hadoop-clients -->
    +	<profiles>
    +		<profile>
    +			<id>hadoop-1</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop1 -->
    +					<name>!hadoop.profile</name>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop1.version}</hbase.version>
    +			</properties>
    +		</profile>
    +		<profile>
    +			<id>hadoop-2</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop2 -->
    +					<name>hadoop.profile</name>
    +					<value>2</value>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop2.version}</hbase.version>
    +			</properties>
    +			<dependencies>
    +				<!-- Force hadoop-common dependency -->
    +				<dependency>
    +					<groupId>org.apache.hadoop</groupId>
    +					<artifactId>hadoop-common</artifactId>
    +				</dependency>
    +			</dependencies>
    +		</profile>
    +		<profile>
    +			<id>cdh5.1.3</id>
    --- End diff --
    
    Why do we need this additional profile?
    Can't users select the hadoop2 profile and then set the specific hadoop and hbase versions through properties, like `-Dhbase.version=0.98.1-cdh5` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19732628
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    +		HTable table = createTable(parameters);
     
    -		if (this.table == null) {
    -			throw new IOException("No table was provided.");
    -		}
    -
    -		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
    -
    +		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
    -
     			throw new IOException("Expecting at least one region.");
     		}
    +		Scan scan = createScanner(parameters);
     		int count = 0;
     		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
    --- End diff --
    
    I'd do ``List splits = new ArrayList(minNumSplits());``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19732203
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    What exactly do you mean by first time and next times?
    ``configure()`` is invoked once in the JobManager to compute the input splits. Then the class is serialized and shipped to all TaskManagers (TM). In each TM thread, the ``configure()`` method is called just once, before ``open()`` is called for each processed InputSplit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20090375
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -116,20 +109,74 @@ under the License.
     			</exclusions>
     		</dependency>
     	</dependencies>
    -		<!-- <dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-server</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		<dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-client</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		 -->
     
    -	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
    -		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
    -		for description of hadoop-clients -->
    +	<profiles>
    +		<profile>
    +			<id>hadoop-1</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop1 -->
    +					<name>!hadoop.profile</name>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop1.version}</hbase.version>
    +			</properties>
    +		</profile>
    +		<profile>
    +			<id>hadoop-2</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop2 -->
    +					<name>hadoop.profile</name>
    +					<value>2</value>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop2.version}</hbase.version>
    +			</properties>
    +			<dependencies>
    +				<!-- Force hadoop-common dependency -->
    +				<dependency>
    +					<groupId>org.apache.hadoop</groupId>
    +					<artifactId>hadoop-common</artifactId>
    +				</dependency>
    +			</dependencies>
    +		</profile>
    +		<profile>
    +			<id>cdh5.1.3</id>
    --- End diff --
    
    Unfortunately cloudera hbase 0.98.1-cdh5.1.3 requires hadoop-commons 2.3.0-cdh5.1.3  which requires hadoop-core 2.3.0-mr1-cdh5.1.3. Without specifying this profile for cloudera it is not possible to manage properly this dependency.
    I don't know why Cloudera did this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20136851
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -116,20 +109,74 @@ under the License.
     			</exclusions>
     		</dependency>
     	</dependencies>
    -		<!-- <dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-server</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		<dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-client</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		 -->
     
    -	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
    -		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
    -		for description of hadoop-clients -->
    +	<profiles>
    +		<profile>
    +			<id>hadoop-1</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop1 -->
    +					<name>!hadoop.profile</name>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop1.version}</hbase.version>
    +			</properties>
    +		</profile>
    +		<profile>
    +			<id>hadoop-2</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop2 -->
    +					<name>hadoop.profile</name>
    +					<value>2</value>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop2.version}</hbase.version>
    +			</properties>
    +			<dependencies>
    +				<!-- Force hadoop-common dependency -->
    +				<dependency>
    +					<groupId>org.apache.hadoop</groupId>
    +					<artifactId>hadoop-common</artifactId>
    +				</dependency>
    +			</dependencies>
    +		</profile>
    +		<profile>
    +			<id>cdh5.1.3</id>
    --- End diff --
    
    I think that's fine if you add a little comment explaining the variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-61467949
  
    What do you mean for OF..?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725351
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    Why did you go for an empty ``configure()`` method? 
    Creating a table and a scanner does probably include communication with the HBase master, which does not come for free. Depending on how expensive these operations are, it might make sense to only do this once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728913
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -59,11 +64,18 @@ public String getStringData() {
     	
     	@Override
     	public void read(DataInputView in) throws IOException {
    -		this.result.readFields(in);
    +		Deserializer<Result> deser = new ResultSerialization().getDeserializer(Result.class);
    +		deser.open(new DataInputViewStream(in));
    +		result = deser.deserialize(null);
    +		deser.close();
     	}
     	
     	@Override
     	public void write(DataOutputView out) throws IOException {
    -		this.result.write(out);	
    +		Serializer<Result> ser = new ResultSerialization().getSerializer(Result.class);
    --- End diff --
    
    Ok..now I got it: changing from Result to Tuple does not require HBaseResult and HBaseKey anymore :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728659
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -179,26 +155,13 @@ protected Scan createScanner(Configuration parameters) {
     	 *        a {@link Configuration} that holds at least the table name.
     	 */
     	protected HTable createTable(Configuration parameters) {
    -		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
    -		LOG.info("Got config location: " + configLocation);
    -		if (configLocation != null)
    -		{
    -			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
    -			if(OperatingSystem.isWindows()) {
    -				dummyConf.addResource(new Path("file:/" + configLocation));
    -			} else {
    -				dummyConf.addResource(new Path("file://" + configLocation));
    -			}
    -			hConf = HBaseConfiguration.create(dummyConf);
    -			;
    -			// hConf.set("hbase.master", "im1a5.internetmemory.org");
    -			LOG.info("hbase master: " + hConf.get("hbase.master"));
    -			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
    -
    -		}
    +		LOG.info("Initializing HBaseConfiguration");
    +		//use files found in the classpath
    +		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
    --- End diff --
    
    So, a user would need to assemble a custom jar file that includes the hbase-site.xml config file.
    Not sure if that is practical. How about adding an optional hbase configuration to the constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728336
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -329,9 +279,9 @@ public void open(TableInputSplit split) throws IOException {
     				continue;
     			}
     
    -			final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
    -			final byte[] startRow = this.scan.getStartRow();
    -			final byte[] stopRow = this.scan.getStopRow();
    +			final String regionLocation = table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
    +			final byte[] startRow = scan.getStartRow();
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19736693
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    The parameter passed to configure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-61465860
  
    Sorry Fabian, I've read the comments just now..I'll try to make some changes accordingly to them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728792
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    +		HTable table = createTable(parameters);
     
    -		if (this.table == null) {
    -			throw new IOException("No table was provided.");
    -		}
    -
    -		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
    -
    +		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
    -
     			throw new IOException("Expecting at least one region.");
     		}
    +		Scan scan = createScanner(parameters);
     		int count = 0;
     		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
    --- End diff --
    
    Could you explain it better please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19733560
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    The ``TableInputFormat`` object is serialized and shipped to the JM and TMs. So all member variables of the ``TableInputFormat`` class are serialized and shipped as well (if not declared transient). This should also work for a ``Configuration`` object. You set the ``Configuration`` as a member in the constructor, so this object should also be serialized and be available when ``configure()``, ``open()``, or any other method is called. 
    
    Which ``Configuration`` object is empty, the member variable or the parameter passed to ``configure()``?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19745693
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    ```
    client.JobClient: java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.hbase.Cell.<init>()
    	at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
    	at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
    	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
    	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    	at org.apache.avro.reflect.ReflectDatumReader.readObjectArray(ReflectDatumReader.java:150)
    	at org.apache.avro.reflect.ReflectDatumReader.readJavaArray(ReflectDatumReader.java:135)
    	at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:125)
    	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
    	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    	at org.apache.flink.api.java.typeutils.runtime.AvroSerializer.deserialize(AvroSerializer.java:126)
    	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:115)
    	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    	at org.apache.flink.runtime.plugable.DeserializationDelegate.read(DeserializationDelegate.java:56)
    	at org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer.getNextRecord(AdaptiveSpanningRecordDeserializer.java:71)
    	at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:182)
    	at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
    	at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
    	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
    	at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:172)
    	at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.NoSuchMethodException: org.apache.hadoop.hbase.Cell.<init>()
    	at java.lang.Class.getConstructor0(Class.java:2892)
    	at java.lang.Class.getDeclaredConstructor(Class.java:2058)
    	at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:310)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19730890
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    I had also the problem that parameters were correct the first time and was empty the next times, so I had to move it to the constructor..why parameters get resetted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-61460535
  
    I had a look at the PR and added some comments.
    It would be good to port the InputFormat to the new API and avoid the custom de/serialization logic in the wrappers if possible. I am not so much familiar with our Maven setup and did not comment on these changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20094591
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -116,20 +109,74 @@ under the License.
     			</exclusions>
     		</dependency>
     	</dependencies>
    -		<!-- <dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-server</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		<dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-client</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		 -->
     
    -	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
    -		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
    -		for description of hadoop-clients -->
    +	<profiles>
    +		<profile>
    +			<id>hadoop-1</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop1 -->
    +					<name>!hadoop.profile</name>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop1.version}</hbase.version>
    +			</properties>
    +		</profile>
    +		<profile>
    +			<id>hadoop-2</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop2 -->
    +					<name>hadoop.profile</name>
    +					<value>2</value>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop2.version}</hbase.version>
    +			</properties>
    +			<dependencies>
    +				<!-- Force hadoop-common dependency -->
    +				<dependency>
    +					<groupId>org.apache.hadoop</groupId>
    +					<artifactId>hadoop-common</artifactId>
    +				</dependency>
    +			</dependencies>
    +		</profile>
    +		<profile>
    +			<id>cdh5.1.3</id>
    --- End diff --
    
    Yes you could but then you have to introduce the hadoop.core.version variable also in the root pom..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-63720791
  
    CLosing this to open a new one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-62415304
  
    Yes..I'm not very familiar with Git..before committing the last changes I synched with the latest version of the master and the I recommitted...however you could just compare the root pom, the flink-addon pom and the flink-hbase extension and do a merge..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725797
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -59,11 +64,18 @@ public String getStringData() {
     	
     	@Override
     	public void read(DataInputView in) throws IOException {
    -		this.result.readFields(in);
    +		Deserializer<Result> deser = new ResultSerialization().getDeserializer(Result.class);
    --- End diff --
    
    Creating two objects for each deserialization is very expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19740411
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    Since Result is a native Hadoop type (implements Writable) it should be treated as such and serialized with its own logic and not with Avro. Something on Flink's side does not seem to work correctly. I'll have a look at that. Until then, I suggest to put this PR aside.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19732836
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    The problem is that in my TableInputFormat I have to read parameters like TableInputFormat.INPUT_TABLE and the only  way I found to pass them to it is to pass the to the constructor (i.e. new MyTableInputFormat(conf)).
    Indeed, those coming from configure do not contains those parameters..how can I put TableInputFormat.INPUT_TABLE parameter in those arriving to the TM through the configure method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19732895
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    +		HTable table = createTable(parameters);
     
    -		if (this.table == null) {
    -			throw new IOException("No table was provided.");
    -		}
    -
    -		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
    -
    +		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
    -
     			throw new IOException("Expecting at least one region.");
     		}
    +		Scan scan = createScanner(parameters);
     		int count = 0;
     		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
    --- End diff --
    
    Ok. done (in my local version)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728214
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -179,26 +155,13 @@ protected Scan createScanner(Configuration parameters) {
     	 *        a {@link Configuration} that holds at least the table name.
     	 */
     	protected HTable createTable(Configuration parameters) {
    -		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
    -		LOG.info("Got config location: " + configLocation);
    -		if (configLocation != null)
    -		{
    -			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
    -			if(OperatingSystem.isWindows()) {
    -				dummyConf.addResource(new Path("file:/" + configLocation));
    -			} else {
    -				dummyConf.addResource(new Path("file://" + configLocation));
    -			}
    -			hConf = HBaseConfiguration.create(dummyConf);
    -			;
    -			// hConf.set("hbase.master", "im1a5.internetmemory.org");
    -			LOG.info("hbase master: " + hConf.get("hbase.master"));
    -			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
    -
    -		}
    +		LOG.info("Initializing HBaseConfiguration");
    +		//use files found in the classpath
    +		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
    --- End diff --
    
    HBase configuration is created depending on the submitted jar and the hbase-site.xml files found in the classpath


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20137745
  
    --- Diff: flink-addons/flink-hbase/pom.xml ---
    @@ -116,20 +109,74 @@ under the License.
     			</exclusions>
     		</dependency>
     	</dependencies>
    -		<!-- <dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-server</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		<dependency>
    -			<groupId>org.apache.hbase</groupId>
    -			<artifactId>hbase-client</artifactId>
    -			<version>${hbase.version}</version>
    -		</dependency>
    -		 -->
     
    -	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
    -		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
    -		for description of hadoop-clients -->
    +	<profiles>
    +		<profile>
    +			<id>hadoop-1</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop1 -->
    +					<name>!hadoop.profile</name>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop1.version}</hbase.version>
    +			</properties>
    +		</profile>
    +		<profile>
    +			<id>hadoop-2</id>
    +			<activation>
    +				<property>
    +					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +					<!--hadoop2 -->
    +					<name>hadoop.profile</name>
    +					<value>2</value>
    +				</property>
    +			</activation>
    +			<properties>
    +				<hbase.version>${hbase.hadoop2.version}</hbase.version>
    +			</properties>
    +			<dependencies>
    +				<!-- Force hadoop-common dependency -->
    +				<dependency>
    +					<groupId>org.apache.hadoop</groupId>
    +					<artifactId>hadoop-common</artifactId>
    +				</dependency>
    +			</dependencies>
    +		</profile>
    +		<profile>
    +			<id>cdh5.1.3</id>
    --- End diff --
    
    Ok done and pushed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19740015
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    I tried to remove HBaseResult and HBaseKey using directly Tuple2<ImmutableBytesWritable, Result> as you suggested but this does not work because of Avro serialization..at the moment I was trying to use the following:
    
    @Override
    	public Tuple2<ImmutableBytesWritable, Result> nextRecord(
    			Tuple2<ImmutableBytesWritable, Result> reuse) throws IOException {
    		if (this.tableRecordReader == null)
    		{
    			throw new IOException("No table record reader provided!");
    		}
    
    		try {
    			if (this.tableRecordReader.nextKeyValue())
    			{
    				Result currentValue = tableRecordReader.getCurrentValue();
    				currentValue.setExists(Boolean.TRUE);//fix null pointer in avro serialization
    				ImmutableBytesWritable currentKey = tableRecordReader.getCurrentKey();
    				reuse.setField(currentKey, 0);
    				reuse.setField(currentValue, 1);
    				return reuse;
    			} else
    			{
    				this.endReached = true;
    			}
    		} catch (InterruptedException e) {
    			LOG.error("Table reader has been interrupted", e);
    			throw new IOException(e);
    		}
    		return null;
    	}
    
    How do I have to proceed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19843418
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    Have you been able to reproduce it or do I have to trace the stack of the calls?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19747240
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    Hi,
    
    can you try to replace line 86 in `GenericTypeInfo` by 
    
    `return new KryoSerializer<T>(this.typeClass);`
    
    and check if that would solve the problem?
    
    We want to switch from Avro to Kryo for serialization of generic objects in the future anyways, but are currently waiting for some improvements that would make Kryo better perform.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19739899
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    Just checked. Result should be a Hadoop data type which should be natively handled by Flink (not as a generic Object which is done via Avro). 
    So, Flink should be capable of handling ``Result``. Might be a problem with Flink's internal type handling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725521
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    +		HTable table = createTable(parameters);
     
    -		if (this.table == null) {
    -			throw new IOException("No table was provided.");
    -		}
    -
    -		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
    -
    +		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
    -
     			throw new IOException("Expecting at least one region.");
     		}
    +		Scan scan = createScanner(parameters);
     		int count = 0;
     		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
    --- End diff --
    
    Not sure what's the upper bound of regions in an HBase table is. Wouldn't it be better to use minNumSplits as the init value for the length of the ``splits`` list. It will automatically increase its size anyway ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19844539
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    The HBaseTableOutputFormat should be of type <T extends Tuple> like the CSVOutputFormat or the JDBCOutputFormat. Depending on the configuration of the OutputFormat, the fields of the tuple are used as row keys or written to specified columns of the HBase table. The JDBCOutputFormat is doing something similar by writing Tuple data to a relational database table.
    In order to make this work, the OutputFormat requires a bit of configuration including the Hbase table, schema information, key information, etc.
    Does that make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19737823
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    Unfortunately I can't make it work..using avro for the serialization I get this error:
    
    java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.hbase.Cell.<init>()
    	at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
    	at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
    
    Do I have to configure something else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r20089750
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -19,186 +19,72 @@
     
     package org.apache.flink.addons.hbase;
    --- End diff --
    
    Why did you change the permissions of this file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725427
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -179,26 +155,13 @@ protected Scan createScanner(Configuration parameters) {
     	 *        a {@link Configuration} that holds at least the table name.
     	 */
     	protected HTable createTable(Configuration parameters) {
    -		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
    -		LOG.info("Got config location: " + configLocation);
    -		if (configLocation != null)
    -		{
    -			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
    -			if(OperatingSystem.isWindows()) {
    -				dummyConf.addResource(new Path("file:/" + configLocation));
    -			} else {
    -				dummyConf.addResource(new Path("file://" + configLocation));
    -			}
    -			hConf = HBaseConfiguration.create(dummyConf);
    -			;
    -			// hConf.set("hbase.master", "im1a5.internetmemory.org");
    -			LOG.info("hbase master: " + hConf.get("hbase.master"));
    -			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
    -
    -		}
    +		LOG.info("Initializing HBaseConfiguration");
    +		//use files found in the classpath
    +		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
    --- End diff --
    
    Have you checked if using an empty configuration is working in a distributed setting? 
    Where does the hostname of the HBase master come from? Is it maybe using a default value (localhost) which works on in a local setup only?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19729002
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -179,26 +155,13 @@ protected Scan createScanner(Configuration parameters) {
     	 *        a {@link Configuration} that holds at least the table name.
     	 */
     	protected HTable createTable(Configuration parameters) {
    -		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
    -		LOG.info("Got config location: " + configLocation);
    -		if (configLocation != null)
    -		{
    -			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
    -			if(OperatingSystem.isWindows()) {
    -				dummyConf.addResource(new Path("file:/" + configLocation));
    -			} else {
    -				dummyConf.addResource(new Path("file://" + configLocation));
    -			}
    -			hConf = HBaseConfiguration.create(dummyConf);
    -			;
    -			// hConf.set("hbase.master", "im1a5.internetmemory.org");
    -			LOG.info("hbase master: " + hConf.get("hbase.master"));
    -			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
    -
    -		}
    +		LOG.info("Initializing HBaseConfiguration");
    +		//use files found in the classpath
    +		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
    --- End diff --
    
    Alright, I think I got it. Sounds good :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725808
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -59,11 +64,18 @@ public String getStringData() {
     	
     	@Override
     	public void read(DataInputView in) throws IOException {
    -		this.result.readFields(in);
    +		Deserializer<Result> deser = new ResultSerialization().getDeserializer(Result.class);
    +		deser.open(new DataInputViewStream(in));
    +		result = deser.deserialize(null);
    +		deser.close();
     	}
     	
     	@Override
     	public void write(DataOutputView out) throws IOException {
    -		this.result.write(out);	
    +		Serializer<Result> ser = new ResultSerialization().getSerializer(Result.class);
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-61466895
  
    Ah, OK. The OF is serialized for shipping it to the TaskManagers. On the TMs configure is called once again.
    So you can simply make ``Scan`` and ``HTable`` transient members to avoid their serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19728981
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -310,17 +264,13 @@ public void open(TableInputSplit split) throws IOException {
     
     	@Override
     	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
    +		HTable table = createTable(parameters);
     
    -		if (this.table == null) {
    -			throw new IOException("No table was provided.");
    -		}
    -
    -		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
    -
    +		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
    -
     			throw new IOException("Expecting at least one region.");
     		}
    +		Scan scan = createScanner(parameters);
     		int count = 0;
     		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
    --- End diff --
    
    If there is a huge table with a several thousand regions, the initial size of the split list would be that big even if only a few regions should be read. If we start with a smaller value, the list would grow by need (with a bit of overhead for reorganization of cause).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-62302025
  
    I hope to finish the Kryo integration next week. Once this is in, the GenericTypeInfo will return a Kryo serializer which should be able to handle more types than Avro. This will also include support for abstract types and interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-61466415
  
    First comment:
     - now Scan and HTable are no more serializable so that was the cause I removed the two fields


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19832577
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    Great! 
    For the TableOutputFormat, it might be better to have something like the JDBCOutputFormat or the CSVOutputFormat, i.e., it receives a Tuple data type and assigns the individual fields of the tuples to HBase keys and/or columns. However, I'm not familiar with HBase's API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/172#issuecomment-61468557
  
    OutputFormat ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19843584
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    The problem with this is that an HBase DataSource is of type <ImmutableBytesWritable, Result> but result is not writable..How do you suggest to proceed? How should be typed of the OutputFormat? Tuple2<ImmutableBytesWritable, Result> or another type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19958851
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -108,14 +90,16 @@
     
     	protected HBaseResult hbaseResult;
     
    -	private org.apache.hadoop.conf.Configuration hConf;
    +	private Configuration parameters;
     
    +//	private org.apache.hadoop.conf.Configuration hConf;
    +	public TableInputFormat(Configuration parameters){
    +		this.parameters = parameters;
    +	}
    +	
     	@Override
     	public void configure(Configuration parameters) {
    -		HTable table = createTable(parameters);
    -		setTable(table);
    -		Scan scan = createScanner(parameters);
    -		setScan(scan);
    +		//TODO why parameters gets empty after execution??
    --- End diff --
    
    The configure method is called by the following:
    
    DataSourceNode.computeOperatorSpecificDefaultEstimates -> EMPTY!
    InputFormatVertex.initializeOnMaster
    DataSourceTask.initInputFormat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Upgraded APIs: Flink (0.8) and HBase...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/172#discussion_r19725771
  
    --- Diff: flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java ---
    @@ -21,10 +21,15 @@
     
     import java.io.IOException;
     
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStream;
     import org.apache.flink.types.Value;
     import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    +import org.apache.hadoop.io.serializer.Deserializer;
    +import org.apache.hadoop.io.serializer.Serializer;
     
     public class HBaseResult implements Value {
    --- End diff --
    
    I think, we don't need the wrappers for HBaseResult and HBaseKey anymore.
    The new API allows to process arbitrary objects and serializes them with Avro (will be changed to Kryo some time in the future). 
    But you should double check if this is working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---