You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by nielsbasjes <gi...@git.apache.org> on 2016/08/03 13:07:55 UTC

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

GitHub user nielsbasjes opened a pull request:

    https://github.com/apache/flink/pull/2330

    FLINK-4311 Fixed several problems in TableInputFormat

    Question: Do you guys want a unit test for this?
    In HBase itself I have done this in the past yet this required a large chunk of additional software to start and stop an HBase minicluster during the unit tests.
    I.e. pull in this thing: 
    https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
    and then do something like this:
    https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestScanRowPrefix.java


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nielsbasjes/flink FLINK-4311

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2330
    
----
commit 5c3d53c810f8df6d5544685ef3f1004c46541daf
Author: Niels Basjes <nb...@bol.com>
Date:   2016-08-03T12:54:34Z

    [FLINK-4311] TableInputFormat can handle reuse for next input split

commit 8696f5e257c7434d62e662c4c97f4ede2da5411b
Author: Niels Basjes <nb...@bol.com>
Date:   2016-08-03T12:56:01Z

    [FLINK-4311] Cannot override a static member function.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79469241
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -93,32 +102,45 @@ private HTable createTable() {
     	}
     
     	@Override
    +	public void open(TableInputSplit split) throws IOException {
    +		if (split == null) {
    +			throw new IOException("Input split is null!");
    +		}
    +
    +		logSplitInfo("opening", split);
    +		scan.setStartRow(split.getStartRow());
    +		lastRow = split.getEndRow();
    +		scan.setStopRow(lastRow);
    +
    +		resultScanner = table.getScanner(scan);
    +		endReached = false;
    +		scannedRows = 0;
    +	}
    +
    +	@Override
     	public boolean reachedEnd() throws IOException {
     		return this.endReached;
     	}
     
     	@Override
     	public T nextRecord(T reuse) throws IOException {
    -		if (this.rs == null){
    +		if (this.resultScanner == null){
     			throw new IOException("No table result scanner provided!");
     		}
     		try{
    --- End diff --
    
    insert space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r73480826
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -237,7 +244,7 @@ private void logSplitInfo(String action, TableInputSplit split) {
     	 *        End key of the region
     	 * @return true, if this region needs to be included as part of the input (default).
     	 */
    -	private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
    +	protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
    --- End diff --
    
    This function is according to the documentation intended so people can override it in a subclass. You cannot overrule a static function (and especially not if it is private).
    http://stackoverflow.com/questions/2223386/why-doesnt-java-allow-overriding-of-static-methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79472506
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTable;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestTableInputFormat extends HBaseTestingClusterAutostarter {
    +	private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
    +	private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
    +	private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
    +
    +	// These are the row ids AND also the values we will put in the test table
    +	private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
    +
    +	@Before
    +	public void createTestTable() throws IOException {
    +		TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
    +		byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
    +		createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
    +		HTable table = openTable(tableName);
    +
    +		for (String rowId : ROW_IDS) {
    +			byte[] rowIdBytes = rowId.getBytes();
    +			Put p = new Put(rowIdBytes);
    +			// Use the rowId as the value to facilitate the testing better
    +			p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
    +			table.put(p);
    +		}
    +
    +		table.close();
    +	}
    +
    +	class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
    +		@Override
    +		protected Scan getScanner() {
    +			return new Scan();
    +		}
    +
    +		@Override
    +		protected String getTableName() {
    +			return TEST_TABLE_NAME;
    +		}
    +
    +		@Override
    +		protected Tuple1<String> mapResultToTuple(Result r) {
    +			return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
    +		}
    +	}
    +
    +	@Test
    +	public void testTableInputFormat() {
    --- End diff --
    
    Can we make this test a bit more lightweight and not execute a Flink program?
    Instead we could test the interface methods of the InputFormat such as:
    - createInputSplits
    - configure
    - open
    - nextRecord
    - close
    
    etc.
    
    if you split the test into several methods, please make sure that HBase is only initalized once with `@BeforeClass`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79816134
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTable;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79816123
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -67,18 +66,23 @@
     	protected abstract T mapResultToTuple(Result r);
     
     	/**
    -	 * creates a {@link Scan} object and a {@link HTable} connection
    +	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
    +	 * These are opened here because they are needed in the createInputSplits
    +	 * which is called before the openInputFormat method.
    +	 * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
     	 *
    -	 * @param parameters
    +	 * @param parameters The configuration that is to be used
     	 * @see Configuration
     	 */
     	@Override
     	public void configure(Configuration parameters) {
    -		this.table = createTable();
    -		this.scan = getScanner();
    +		table = createTable();
    +		scan = getScanner();
    --- End diff --
    
    Done. Yet because 'configure()' doesn't have a way to fail nicely I added those checks to the other methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Now I see why I missed these two; They are newer than the 1.0.3 I was working with.
    Is it a good idea to add ' throws IOException' to these two in RichInputFormat ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I would say yes, since `open()` and `close()` can also throw an `IOException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2330


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I disabled tests for the hadoop1 profile. 
    Will build the PR one more time and merge if everything passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79816145
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTable;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
    +	private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
    +	private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
    +	private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
    +
    +	// These are the row ids AND also the values we will put in the test table
    +	private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
    +
    +	@Before
    +	public void createTestTable() throws IOException {
    +		TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
    +		byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
    +		createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
    +		HTable table = openTable(tableName);
    +
    +		for (String rowId : ROW_IDS) {
    +			byte[] rowIdBytes = rowId.getBytes();
    +			Put p = new Put(rowIdBytes);
    +			// Use the rowId as the value to facilitate the testing better
    +			p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
    +			table.put(p);
    +		}
    +
    +		table.close();
    +	}
    +
    +	class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
    +		@Override
    +		protected Scan getScanner() {
    +			return new Scan();
    +		}
    +
    +		@Override
    +		protected String getTableName() {
    +			return TEST_TABLE_NAME;
    +		}
    +
    +		@Override
    +		protected Tuple1<String> mapResultToTuple(Result r) {
    +			return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
    +		}
    +	}
    +
    +	@Test
    +	public void testTableInputFormat() {
    +		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
    +		environment.setParallelism(1);
    +
    +		DataSet<String> resultDataSet =
    +			environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
    +				@Override
    +				public String map(Tuple1<String> value) throws Exception {
    +					return value.f0;
    +				}
    +			});
    +
    +		List<String> resultSet = new ArrayList<>();
    +		resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
    +
    +		try {
    +			environment.execute("HBase InputFormat Test");
    +		} catch (Exception e) {
    +			Assert.fail("HBase InputFormat test failed. " + e.getMessage());
    +		}
    +
    +		for (String rowId : ROW_IDS) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Yes, that is indeed the right place to do this. 
    Bummer this method does not allow throwing exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79477333
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml ---
    @@ -1,43 +0,0 @@
    -<?xml version="1.0"?>
    -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    -<!--
    -/**
    - *
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    --->
    -<configuration>
    --- End diff --
    
    I'm not so familiar with HBase. Is the config no longer required for HBase 1.1.2 or why did you remove it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Hi @nielsbasjes, thanks for fixing and cleaning up the `TableInputFormat`. 
    This PR is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Oh damn, 
    I just noticed a major issue in this: In order to create the input splits the table needs to be available "before" the call to the 'open' method.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Question: Is this change good? 
    Or do you have more things that I need to change before it can be committed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    maybe you can move the table initialization into `openInputFormat()` (called once before all splits) and close it in `closeInputFormat()` (called once after all splits).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r73382033
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -237,7 +244,7 @@ private void logSplitInfo(String action, TableInputSplit split) {
     	 *        End key of the region
     	 * @return true, if this region needs to be included as part of the input (default).
     	 */
    -	private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
    +	protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
    --- End diff --
    
    why are you changing this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Current version has a problem in building the shaded jars.
    I runs into an infinite loop in creating the dependency-reduced-pom.xml as described here: 
    
    **Shade Plugin gets stuck in infinite loop building dependency reduced POM** https://issues.apache.org/jira/browse/MSHADE-148
    Although all my versions are newer than the fix described there I still see the problem.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79469513
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException {
     	}
     
     	@Override
    -	public void open(TableInputSplit split) throws IOException {
    -		if (split == null){
    -			throw new IOException("Input split is null!");
    -		}
    -		if (table == null){
    -			throw new IOException("No HTable provided!");
    -		}
    -		if (scan == null){
    -			throw new IOException("No Scan instance provided");
    +	public void close() throws IOException {
    +		LOG.info("Closing split (scanned {} rows)", scannedRows);
    +		this.lastRow = null;
    +		try {
    +			if(resultScanner !=null) {
    --- End diff --
    
    insert space between `!=` and `null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I'll propose to drop the hadoop1 builds on the dev ML.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79469197
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -93,32 +102,45 @@ private HTable createTable() {
     	}
     
     	@Override
    +	public void open(TableInputSplit split) throws IOException {
    +		if (split == null) {
    +			throw new IOException("Input split is null!");
    +		}
    +
    +		logSplitInfo("opening", split);
    +		scan.setStartRow(split.getStartRow());
    +		lastRow = split.getEndRow();
    +		scan.setStopRow(lastRow);
    +
    +		resultScanner = table.getScanner(scan);
    +		endReached = false;
    +		scannedRows = 0;
    +	}
    +
    +	@Override
     	public boolean reachedEnd() throws IOException {
     		return this.endReached;
     	}
     
     	@Override
     	public T nextRecord(T reuse) throws IOException {
    -		if (this.rs == null){
    +		if (this.resultScanner == null){
    --- End diff --
    
    add a space before `{`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I will add a unit test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79468879
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -67,15 +67,24 @@
     	protected abstract T mapResultToTuple(Result r);
     
     	/**
    -	 * creates a {@link Scan} object and a {@link HTable} connection
    -	 *
    -	 * @param parameters
    +	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
    +	 * These are opened here because they are needed in the createInputSplits
    +	 * which is called before the openInputFormat method.
    +	 * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
    +	 * @param parameters The configuration that is to be used
     	 * @see Configuration
     	 */
     	@Override
     	public void configure(Configuration parameters) {
    -		this.table = createTable();
    -		this.scan = getScanner();
    +		table = createTable();
    +		scan = getScanner();
    +	}
    +
    +	/**
    +	 * Do nothing.
    +	 */
    +	@Override
    +	public void openInputFormat() throws IOException {
    --- End diff --
    
    No need to override this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r73381826
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -328,7 +328,11 @@ public void run() {
     				synchronized (checkpointLock) {
     					LOG.info("Reader terminated, and exiting...");
     
    -					this.format.closeInputFormat();
    +					try {
    +						this.format.closeInputFormat();
    +					} catch (IOException e) {
    +						// Ignoring
    --- End diff --
    
    it would be good to log the exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79791290
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTable;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
    --- End diff --
    
    Please rename to `TableInputFormatITCase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I managed to resolve the problems with running these unit tests. 
    These problems were caused by version conflicts in guava.
    Now we have a HBaseMiniCluster that is started, a table with multiple regions is created. And the TableInputFormat is used to extract the rows again. By setting the paralellism to 1 the same TableInputFormat instance is used for multiple regions and succeeds (the problem this all started with).
    
    Please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r73482199
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -328,7 +328,11 @@ public void run() {
     				synchronized (checkpointLock) {
     					LOG.info("Reader terminated, and exiting...");
     
    -					this.format.closeInputFormat();
    +					try {
    +						this.format.closeInputFormat();
    +					} catch (IOException e) {
    +						// Ignoring
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79791171
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -67,18 +66,23 @@
     	protected abstract T mapResultToTuple(Result r);
     
     	/**
    -	 * creates a {@link Scan} object and a {@link HTable} connection
    +	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
    +	 * These are opened here because they are needed in the createInputSplits
    +	 * which is called before the openInputFormat method.
    +	 * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
     	 *
    -	 * @param parameters
    +	 * @param parameters The configuration that is to be used
     	 * @see Configuration
     	 */
     	@Override
     	public void configure(Configuration parameters) {
    -		this.table = createTable();
    -		this.scan = getScanner();
    +		table = createTable();
    +		scan = getScanner();
    --- End diff --
    
    Can you add a check that `table` and `scan` are properly initialized, i.e., `!= null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79791682
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTable;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
    +	private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
    +	private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
    +	private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
    +
    +	// These are the row ids AND also the values we will put in the test table
    +	private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
    +
    +	@Before
    +	public void createTestTable() throws IOException {
    +		TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
    +		byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
    +		createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
    +		HTable table = openTable(tableName);
    +
    +		for (String rowId : ROW_IDS) {
    +			byte[] rowIdBytes = rowId.getBytes();
    +			Put p = new Put(rowIdBytes);
    +			// Use the rowId as the value to facilitate the testing better
    +			p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
    +			table.put(p);
    +		}
    +
    +		table.close();
    +	}
    +
    +	class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
    +		@Override
    +		protected Scan getScanner() {
    +			return new Scan();
    +		}
    +
    +		@Override
    +		protected String getTableName() {
    +			return TEST_TABLE_NAME;
    +		}
    +
    +		@Override
    +		protected Tuple1<String> mapResultToTuple(Result r) {
    +			return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
    +		}
    +	}
    +
    +	@Test
    +	public void testTableInputFormat() {
    +		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
    +		environment.setParallelism(1);
    +
    +		DataSet<String> resultDataSet =
    +			environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
    +				@Override
    +				public String map(Tuple1<String> value) throws Exception {
    +					return value.f0;
    +				}
    +			});
    +
    +		List<String> resultSet = new ArrayList<>();
    +		resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
    +
    +		try {
    +			environment.execute("HBase InputFormat Test");
    +		} catch (Exception e) {
    +			Assert.fail("HBase InputFormat test failed. " + e.getMessage());
    +		}
    +
    +		for (String rowId : ROW_IDS) {
    --- End diff --
    
    Please add a check that `ROW_IDS` and `resultSet` have the same size to ensure that each record is read exactly once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79791053
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -67,18 +66,23 @@
     	protected abstract T mapResultToTuple(Result r);
    --- End diff --
    
    Can we remove the confusing comment `"abstract methods allow for multiple table and scanners in the same job"` and add JavaDocs to all abstract methods that describe what is expected from their implementation?
    
    A `TableInputFormat` instance should only scan a single table. In case more tables need to be read, each could be read with a separate `TableInputFormat` instance and the output of those could be unioned if needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I did a few serious attempts to create a unit test that fires the HBaseMiniCluster ... and failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I had another look at the "multiple tables" question. The name of the table comes from the getTableName method that is to be implemented by the subclass. I consider it to be extremely unlikely that multiple calls to that method in a single instance will yield different table names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Hi @nielsbasjes, I just posted to the dev mailinglist and proposed to update the HBase dependency to 1.2.3 (as [FLINK-2765](https://issues.apache.org/jira/browse/FLINK-2765) suggests). By the end of the week we have a decision and I will merge this PR to the master branch.
    
    In the meantime, I will merge the fixed TableInputFormat changes to the Flink 1.1 branch and revert all breaking changes (pom.xml, RichInputFormat, hbase-site.xml, tests, ...). 
    For Flink 1.2.0 we want these changes.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79791990
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml ---
    @@ -1,43 +0,0 @@
    -<?xml version="1.0"?>
    -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    -<!--
    -/**
    - *
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    --->
    -<configuration>
    --- End diff --
    
    I see, thanks.
    I think so far this file has been used as a template. Not sure how valuable it is to have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79470876
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTable;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestTableInputFormat extends HBaseTestingClusterAutostarter {
    --- End diff --
    
    Long running integration tests have to follow the `*ITCase` naming pattern. This will cause them to be executed in Maven's verify phase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I noticed building this PR for hadoop1 (`mvn clean install -Dhadoop.profile=1`) fails:
    
    > The following artifacts could not be resolved: org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1, org.apache.hbase:hbase-hadoop2-compat:jar:tests:0.98.11-hadoop1: Could not find artifact org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1 in central
    
    I'm not a Maven guru. Is it possible to disable compiling and executing the tests for the hadoop1 profile?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by nielsbasjes <gi...@git.apache.org>.
Github user nielsbasjes commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    Note that this version still assumes that the single instance will only see multiple splits for the same table. Is that a safe assumption?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2330: FLINK-4311 Fixed several problems in TableInputFormat

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I don't know, and it seems the InputFormat itself doesn't know either. If we go by the previous implementation then yes, there will only be one table. However, based on the comments on Line 64: `// abstract methods allow for multiple table and scanners in the same job` we have to conclude that there can be different tables.
    
    I'd be curious what @twalthr thinks about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79469573
  
    --- Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---
    @@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException {
     	}
     
     	@Override
    -	public void open(TableInputSplit split) throws IOException {
    -		if (split == null){
    -			throw new IOException("Input split is null!");
    -		}
    -		if (table == null){
    -			throw new IOException("No HTable provided!");
    -		}
    -		if (scan == null){
    -			throw new IOException("No Scan instance provided");
    +	public void close() throws IOException {
    +		LOG.info("Closing split (scanned {} rows)", scannedRows);
    +		this.lastRow = null;
    +		try {
    +			if(resultScanner !=null) {
    +				this.resultScanner.close();
    +			}
    +		} finally {
    +			this.resultScanner = null;
     		}
    -
    -		logSplitInfo("opening", split);
    -		scan.setStartRow(split.getStartRow());
    -		lastRow = split.getEndRow();
    -		scan.setStopRow(lastRow);
    -
    -		this.rs = table.getScanner(scan);
    -		this.endReached = false;
    -		this.scannedRows = 0;
     	}
     
     	@Override
    -	public void close() throws IOException {
    -		if(rs!=null){
    -			this.rs.close();
    -		}
    -		if(table!=null){
    -			this.table.close();
    +	public void closeInputFormat() throws IOException {
    +		try {
    +			if(table!=null) {
    --- End diff --
    
    insert spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2330: FLINK-4311 Fixed several problems in TableInputFor...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79477173
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties ---
    @@ -15,9 +15,16 @@
     # specific language governing permissions and limitations
     # under the License.
     
    -log4j.rootLogger=${hadoop.root.logger}
    -hadoop.root.logger=INFO,console
    -log4j.appender.console=org.apache.log4j.ConsoleAppender
    -log4j.appender.console.target=System.err
    -log4j.appender.console.layout=org.apache.log4j.PatternLayout
    -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
    +log4j.rootLogger=DEBUG, stdout, file
    +log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    +log4j.appender.stdout.Target=System.out
    +log4j.appender.stdout.threshold=INFO
    +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n
    +## file appender
    --- End diff --
    
    Can you remove the file appender configuration? This creates a 3.5 MB file and make the test heavier. We are suffering from long build times and try to keep new tests as lightweight as possible. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---