You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Oliver Swoboda <os...@gmail.com> on 2016/11/03 12:57:35 UTC

Using Flink with Accumulo

Hello,

I'm using Flink with Accumulo and wanted to read data from the database by
using the createHadoopInput function. Therefore I configure an
AccumuloInputFormat. The source code you can find here: https://github.com/
OSwoboda/masterthesis/blob/master/aggregation.flink/src/
main/java/de/oswoboda/aggregation/Main.java

I'm using a 5 Node Cluster (1 Master, 4 Worker).
Accumulo is installed with Ambari and has 1 Master Server on the Master
Node and 4 Tablet Servers (one on each Worker).
Flink is installed standalone with the Jobmanager on the Master Node and 4
Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks, so
there are 32 in total.

First problem I have:
If I start serveral Flink Jobs the client count for Zookeeper in the
Accumulo Overview is constantly increasing. I assume that the used scanner
isn't correctly closed. The client count only decreases to normal values
when I restart Flink.

Second problem I have:
I want to compare aggregations on time series data with Accumulo (with
Iterators) and with flink. Unfortunately, the results vary inexplicable
when I'm using Flink. I wanted to compare the results for a full table scan
(called baseline in the code), but sometimes it takes 17-18 minutes and
sometimes its between 30 and 60 minutes. In the longer case I can see in
the Accumulo Overview that after some time only one worker is left with
running scans and there are just a few entries/s sanned (4 million at the
beginning when all workers are running to 200k when the one worker is
left). Because there are 2.5 billion records to scan and almost 500 million
left it takes really long.
This problem doesn't occur with Accumulo using Iterators and a batch
scanner on the master node, each scan has almost identical durations and
graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks are
for each scan the same.

Yours faithfully,
Oliver Swoboda

Re: Using Flink with Accumulo

Posted by Josh Elser <el...@apache.org>.

Oliver Swoboda wrote:
> Hi Josh, thank you for your quick answer!
>
> 2016-11-03 17:03 GMT+01:00 Josh Elser <elserj@apache.org
> <ma...@apache.org>>:
>
>     Hi Oliver,
>
>     Cool stuff. I wish I knew more about Flink to make some better
>     suggestions. Some points inline, and sorry in advance if I suggest
>     something outright wrong. Hopefully someone from the Flink side can
>     help give context where necessary :)
>
>     Oliver Swoboda wrote:
>
>         Hello,
>
>         I'm using Flink with Accumulo and wanted to read data from the
>         database
>         by using the createHadoopInput function. Therefore I configure an
>         AccumuloInputFormat. The source code you can find here:
>         https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
>         <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>         <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
>         <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>>
>
>         I'm using a 5 Node Cluster (1 Master, 4 Worker).
>         Accumulo is installed with Ambari and has 1 Master Server on the
>         Master
>         Node and 4 Tablet Servers (one on each Worker).
>         Flink is installed standalone with the Jobmanager on the Master
>         Node and
>         4 Taskmanagers (one on each Worker). Every Taskmanager can have
>         4 Tasks,
>         so there are 32 in total.
>
>         First problem I have:
>         If I start serveral Flink Jobs the client count for Zookeeper in the
>         Accumulo Overview is constantly increasing. I assume that the used
>         scanner isn't correctly closed. The client count only decreases to
>         normal values when I restart Flink.
>
>
>     Hrm, this does seem rather bad. Eventually, you'll saturate the
>     connections to ZK and ZK itself will start limiting new connections
>     (per the maxClientCnxns property).
>
>     This sounds somewhat familiar to
>     https://issues.apache.org/jira/browse/ACCUMULO-2113
>     <https://issues.apache.org/jira/browse/ACCUMULO-2113>. The lack of a
>     proper "close()" method on the Instance interface is a known
>     deficiency. I'm not sure how Flink execution happens, so I am kind
>     of just guessing.
>
>     You might be able to try to use the CleanUp[1] utility to close out
>     the thread pools/connections when your Flink "task" is done.
>
>
> Unfortunately that didn't worked. I guess because Flink is starting the
> tasks with the scanners by a TaskManager and I can't access those tasks
> with my program. So after the task is done, I can't close the
> connections with the utility, because the thread where I use it hasn't
> startet the scanners.

I see. I'm sorry, but I just don't have any other suggestion to give 
you. I'm not familiar with Flink's execution model. Good luck.

>         Second problem I have:
>         I want to compare aggregations on time series data with Accumulo
>         (with
>         Iterators) and with flink. Unfortunately, the results vary
>         inexplicable
>         when I'm using Flink. I wanted to compare the results for a full
>         table
>         scan (called baseline in the code), but sometimes it takes 17-18
>         minutes
>         and sometimes its between 30 and 60 minutes. In the longer case
>         I can
>         see in the Accumulo Overview that after some time only one worker is
>         left with running scans and there are just a few entries/s sanned (4
>         million at the beginning when all workers are running to 200k
>         when the
>         one worker is left). Because there are 2.5 billion records to
>         scan and
>         almost 500 million left it takes really long.
>         This problem doesn't occur with Accumulo using Iterators and a batch
>         scanner on the master node, each scan has almost identical
>         durations and
>         graphics in the Accumulo Overview for entries/s, MB/s scanned
>         and seeks
>         are for each scan the same.
>
>
>     It sounds like maybe your partitioning was sub-optimal and caused
>     one task to get a majority of the data? Having the
>     autoAdjustRanges=true (as you do by default) should help get many
>     batches of work based on the tablet boundaries in Accumulo. I'm not
>     sure how Flink actually executes them though.
>
>
> The problem was that half of the data was on one node after a restart of
> accumulo. It seems that it has something to do with the problem
> described here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I
> stopped and then startet accumulo instead of doing a restart and then
> the data is distributed evenly across all nodes. For my tests I keep
> accumulo running now, because after each restart the data distribution
> is changed and I don't want to upgrade to 1.8.

Your tablet distribution was not even? The Master should be 
automatically managing this for you, but this will not happen if there 
are offline tablets that are not getting assigned. Looking in the Master 
log for messages about the balancer should help you here. You do not 
need to upgrade to 1.8.0 to get an even distribution of data across all 
nodes in Accumulo.

One trick might be to restart the Accumulo master if everything appears 
to be OK otherwise.

>         Yours faithfully,
>         Oliver Swoboda
>
>
>
>     [1]
>     https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36
>     <https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36>
>
>

Re: Using Flink with Accumulo

Posted by Josh Elser <el...@apache.org>.

Oliver Swoboda wrote:
> Hi Josh, thank you for your quick answer!
>
> 2016-11-03 17:03 GMT+01:00 Josh Elser <elserj@apache.org
> <ma...@apache.org>>:
>
>     Hi Oliver,
>
>     Cool stuff. I wish I knew more about Flink to make some better
>     suggestions. Some points inline, and sorry in advance if I suggest
>     something outright wrong. Hopefully someone from the Flink side can
>     help give context where necessary :)
>
>     Oliver Swoboda wrote:
>
>         Hello,
>
>         I'm using Flink with Accumulo and wanted to read data from the
>         database
>         by using the createHadoopInput function. Therefore I configure an
>         AccumuloInputFormat. The source code you can find here:
>         https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
>         <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>         <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
>         <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>>
>
>         I'm using a 5 Node Cluster (1 Master, 4 Worker).
>         Accumulo is installed with Ambari and has 1 Master Server on the
>         Master
>         Node and 4 Tablet Servers (one on each Worker).
>         Flink is installed standalone with the Jobmanager on the Master
>         Node and
>         4 Taskmanagers (one on each Worker). Every Taskmanager can have
>         4 Tasks,
>         so there are 32 in total.
>
>         First problem I have:
>         If I start serveral Flink Jobs the client count for Zookeeper in the
>         Accumulo Overview is constantly increasing. I assume that the used
>         scanner isn't correctly closed. The client count only decreases to
>         normal values when I restart Flink.
>
>
>     Hrm, this does seem rather bad. Eventually, you'll saturate the
>     connections to ZK and ZK itself will start limiting new connections
>     (per the maxClientCnxns property).
>
>     This sounds somewhat familiar to
>     https://issues.apache.org/jira/browse/ACCUMULO-2113
>     <https://issues.apache.org/jira/browse/ACCUMULO-2113>. The lack of a
>     proper "close()" method on the Instance interface is a known
>     deficiency. I'm not sure how Flink execution happens, so I am kind
>     of just guessing.
>
>     You might be able to try to use the CleanUp[1] utility to close out
>     the thread pools/connections when your Flink "task" is done.
>
>
> Unfortunately that didn't worked. I guess because Flink is starting the
> tasks with the scanners by a TaskManager and I can't access those tasks
> with my program. So after the task is done, I can't close the
> connections with the utility, because the thread where I use it hasn't
> startet the scanners.

I see. I'm sorry, but I just don't have any other suggestion to give 
you. I'm not familiar with Flink's execution model. Good luck.

>         Second problem I have:
>         I want to compare aggregations on time series data with Accumulo
>         (with
>         Iterators) and with flink. Unfortunately, the results vary
>         inexplicable
>         when I'm using Flink. I wanted to compare the results for a full
>         table
>         scan (called baseline in the code), but sometimes it takes 17-18
>         minutes
>         and sometimes its between 30 and 60 minutes. In the longer case
>         I can
>         see in the Accumulo Overview that after some time only one worker is
>         left with running scans and there are just a few entries/s sanned (4
>         million at the beginning when all workers are running to 200k
>         when the
>         one worker is left). Because there are 2.5 billion records to
>         scan and
>         almost 500 million left it takes really long.
>         This problem doesn't occur with Accumulo using Iterators and a batch
>         scanner on the master node, each scan has almost identical
>         durations and
>         graphics in the Accumulo Overview for entries/s, MB/s scanned
>         and seeks
>         are for each scan the same.
>
>
>     It sounds like maybe your partitioning was sub-optimal and caused
>     one task to get a majority of the data? Having the
>     autoAdjustRanges=true (as you do by default) should help get many
>     batches of work based on the tablet boundaries in Accumulo. I'm not
>     sure how Flink actually executes them though.
>
>
> The problem was that half of the data was on one node after a restart of
> accumulo. It seems that it has something to do with the problem
> described here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I
> stopped and then startet accumulo instead of doing a restart and then
> the data is distributed evenly across all nodes. For my tests I keep
> accumulo running now, because after each restart the data distribution
> is changed and I don't want to upgrade to 1.8.

Your tablet distribution was not even? The Master should be 
automatically managing this for you, but this will not happen if there 
are offline tablets that are not getting assigned. Looking in the Master 
log for messages about the balancer should help you here. You do not 
need to upgrade to 1.8.0 to get an even distribution of data across all 
nodes in Accumulo.

One trick might be to restart the Accumulo master if everything appears 
to be OK otherwise.

>         Yours faithfully,
>         Oliver Swoboda
>
>
>
>     [1]
>     https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36
>     <https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36>
>
>

Re: Using Flink with Accumulo

Posted by Oliver Swoboda <os...@gmail.com>.
Hi Josh, thank you for your quick answer!

2016-11-03 17:03 GMT+01:00 Josh Elser <el...@apache.org>:

> Hi Oliver,
>
> Cool stuff. I wish I knew more about Flink to make some better
> suggestions. Some points inline, and sorry in advance if I suggest
> something outright wrong. Hopefully someone from the Flink side can help
> give context where necessary :)
>
> Oliver Swoboda wrote:
>
>> Hello,
>>
>> I'm using Flink with Accumulo and wanted to read data from the database
>> by using the createHadoopInput function. Therefore I configure an
>> AccumuloInputFormat. The source code you can find here:
>> https://github.com/OSwoboda/masterthesis/blob/master/aggrega
>> tion.flink/src/main/java/de/oswoboda/aggregation/Main.java
>> <https://github.com/OSwoboda/masterthesis/blob/master/aggreg
>> ation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>>
>> I'm using a 5 Node Cluster (1 Master, 4 Worker).
>> Accumulo is installed with Ambari and has 1 Master Server on the Master
>> Node and 4 Tablet Servers (one on each Worker).
>> Flink is installed standalone with the Jobmanager on the Master Node and
>> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
>> so there are 32 in total.
>>
>> First problem I have:
>> If I start serveral Flink Jobs the client count for Zookeeper in the
>> Accumulo Overview is constantly increasing. I assume that the used
>> scanner isn't correctly closed. The client count only decreases to
>> normal values when I restart Flink.
>>
>
> Hrm, this does seem rather bad. Eventually, you'll saturate the
> connections to ZK and ZK itself will start limiting new connections (per
> the maxClientCnxns property).
>
> This sounds somewhat familiar to https://issues.apache.org/jira
> /browse/ACCUMULO-2113. The lack of a proper "close()" method on the
> Instance interface is a known deficiency. I'm not sure how Flink execution
> happens, so I am kind of just guessing.
>
> You might be able to try to use the CleanUp[1] utility to close out the
> thread pools/connections when your Flink "task" is done.


Unfortunately that didn't worked. I guess because Flink is starting the
tasks with the scanners by a TaskManager and I can't access those tasks
with my program. So after the task is done, I can't close the connections
with the utility, because the thread where I use it hasn't startet the
scanners.

Second problem I have:
>> I want to compare aggregations on time series data with Accumulo (with
>> Iterators) and with flink. Unfortunately, the results vary inexplicable
>> when I'm using Flink. I wanted to compare the results for a full table
>> scan (called baseline in the code), but sometimes it takes 17-18 minutes
>> and sometimes its between 30 and 60 minutes. In the longer case I can
>> see in the Accumulo Overview that after some time only one worker is
>> left with running scans and there are just a few entries/s sanned (4
>> million at the beginning when all workers are running to 200k when the
>> one worker is left). Because there are 2.5 billion records to scan and
>> almost 500 million left it takes really long.
>> This problem doesn't occur with Accumulo using Iterators and a batch
>> scanner on the master node, each scan has almost identical durations and
>> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
>> are for each scan the same.
>>
>
> It sounds like maybe your partitioning was sub-optimal and caused one task
> to get a majority of the data? Having the autoAdjustRanges=true (as you do
> by default) should help get many batches of work based on the tablet
> boundaries in Accumulo. I'm not sure how Flink actually executes them
> though.
>

The problem was that half of the data was on one node after a restart of
accumulo. It seems that it has something to do with the problem described
here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I stopped and
then startet accumulo instead of doing a restart and then the data is
distributed evenly across all nodes. For my tests I keep accumulo running
now, because after each restart the data distribution is changed and I
don't want to upgrade to 1.8.

Yours faithfully,
>> Oliver Swoboda
>>
>
>
> [1] https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c
> 5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/
> core/util/CleanUp.java#L36
>
>

Re: Using Flink with Accumulo

Posted by Oliver Swoboda <os...@gmail.com>.
Hi Josh, thank you for your quick answer!

2016-11-03 17:03 GMT+01:00 Josh Elser <el...@apache.org>:

> Hi Oliver,
>
> Cool stuff. I wish I knew more about Flink to make some better
> suggestions. Some points inline, and sorry in advance if I suggest
> something outright wrong. Hopefully someone from the Flink side can help
> give context where necessary :)
>
> Oliver Swoboda wrote:
>
>> Hello,
>>
>> I'm using Flink with Accumulo and wanted to read data from the database
>> by using the createHadoopInput function. Therefore I configure an
>> AccumuloInputFormat. The source code you can find here:
>> https://github.com/OSwoboda/masterthesis/blob/master/aggrega
>> tion.flink/src/main/java/de/oswoboda/aggregation/Main.java
>> <https://github.com/OSwoboda/masterthesis/blob/master/aggreg
>> ation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>>
>> I'm using a 5 Node Cluster (1 Master, 4 Worker).
>> Accumulo is installed with Ambari and has 1 Master Server on the Master
>> Node and 4 Tablet Servers (one on each Worker).
>> Flink is installed standalone with the Jobmanager on the Master Node and
>> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
>> so there are 32 in total.
>>
>> First problem I have:
>> If I start serveral Flink Jobs the client count for Zookeeper in the
>> Accumulo Overview is constantly increasing. I assume that the used
>> scanner isn't correctly closed. The client count only decreases to
>> normal values when I restart Flink.
>>
>
> Hrm, this does seem rather bad. Eventually, you'll saturate the
> connections to ZK and ZK itself will start limiting new connections (per
> the maxClientCnxns property).
>
> This sounds somewhat familiar to https://issues.apache.org/jira
> /browse/ACCUMULO-2113. The lack of a proper "close()" method on the
> Instance interface is a known deficiency. I'm not sure how Flink execution
> happens, so I am kind of just guessing.
>
> You might be able to try to use the CleanUp[1] utility to close out the
> thread pools/connections when your Flink "task" is done.


Unfortunately that didn't worked. I guess because Flink is starting the
tasks with the scanners by a TaskManager and I can't access those tasks
with my program. So after the task is done, I can't close the connections
with the utility, because the thread where I use it hasn't startet the
scanners.

Second problem I have:
>> I want to compare aggregations on time series data with Accumulo (with
>> Iterators) and with flink. Unfortunately, the results vary inexplicable
>> when I'm using Flink. I wanted to compare the results for a full table
>> scan (called baseline in the code), but sometimes it takes 17-18 minutes
>> and sometimes its between 30 and 60 minutes. In the longer case I can
>> see in the Accumulo Overview that after some time only one worker is
>> left with running scans and there are just a few entries/s sanned (4
>> million at the beginning when all workers are running to 200k when the
>> one worker is left). Because there are 2.5 billion records to scan and
>> almost 500 million left it takes really long.
>> This problem doesn't occur with Accumulo using Iterators and a batch
>> scanner on the master node, each scan has almost identical durations and
>> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
>> are for each scan the same.
>>
>
> It sounds like maybe your partitioning was sub-optimal and caused one task
> to get a majority of the data? Having the autoAdjustRanges=true (as you do
> by default) should help get many batches of work based on the tablet
> boundaries in Accumulo. I'm not sure how Flink actually executes them
> though.
>

The problem was that half of the data was on one node after a restart of
accumulo. It seems that it has something to do with the problem described
here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I stopped and
then startet accumulo instead of doing a restart and then the data is
distributed evenly across all nodes. For my tests I keep accumulo running
now, because after each restart the data distribution is changed and I
don't want to upgrade to 1.8.

Yours faithfully,
>> Oliver Swoboda
>>
>
>
> [1] https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c
> 5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/
> core/util/CleanUp.java#L36
>
>

Re: Using Flink with Accumulo

Posted by Josh Elser <el...@apache.org>.
Hi Oliver,

Cool stuff. I wish I knew more about Flink to make some better 
suggestions. Some points inline, and sorry in advance if I suggest 
something outright wrong. Hopefully someone from the Flink side can help 
give context where necessary :)

Oliver Swoboda wrote:
> Hello,
>
> I'm using Flink with Accumulo and wanted to read data from the database
> by using the createHadoopInput function. Therefore I configure an
> AccumuloInputFormat. The source code you can find here:
> https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
> <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>
> I'm using a 5 Node Cluster (1 Master, 4 Worker).
> Accumulo is installed with Ambari and has 1 Master Server on the Master
> Node and 4 Tablet Servers (one on each Worker).
> Flink is installed standalone with the Jobmanager on the Master Node and
> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
> so there are 32 in total.
>
> First problem I have:
> If I start serveral Flink Jobs the client count for Zookeeper in the
> Accumulo Overview is constantly increasing. I assume that the used
> scanner isn't correctly closed. The client count only decreases to
> normal values when I restart Flink.

Hrm, this does seem rather bad. Eventually, you'll saturate the 
connections to ZK and ZK itself will start limiting new connections (per 
the maxClientCnxns property).

This sounds somewhat familiar to 
https://issues.apache.org/jira/browse/ACCUMULO-2113. The lack of a 
proper "close()" method on the Instance interface is a known deficiency. 
I'm not sure how Flink execution happens, so I am kind of just guessing.

You might be able to try to use the CleanUp[1] utility to close out the 
thread pools/connections when your Flink "task" is done.

> Second problem I have:
> I want to compare aggregations on time series data with Accumulo (with
> Iterators) and with flink. Unfortunately, the results vary inexplicable
> when I'm using Flink. I wanted to compare the results for a full table
> scan (called baseline in the code), but sometimes it takes 17-18 minutes
> and sometimes its between 30 and 60 minutes. In the longer case I can
> see in the Accumulo Overview that after some time only one worker is
> left with running scans and there are just a few entries/s sanned (4
> million at the beginning when all workers are running to 200k when the
> one worker is left). Because there are 2.5 billion records to scan and
> almost 500 million left it takes really long.
> This problem doesn't occur with Accumulo using Iterators and a batch
> scanner on the master node, each scan has almost identical durations and
> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
> are for each scan the same.

It sounds like maybe your partitioning was sub-optimal and caused one 
task to get a majority of the data? Having the autoAdjustRanges=true (as 
you do by default) should help get many batches of work based on the 
tablet boundaries in Accumulo. I'm not sure how Flink actually executes 
them though.

> Yours faithfully,
> Oliver Swoboda


[1] 
https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36


Re: Using Flink with Accumulo

Posted by Josh Elser <el...@apache.org>.
Hi Oliver,

Cool stuff. I wish I knew more about Flink to make some better 
suggestions. Some points inline, and sorry in advance if I suggest 
something outright wrong. Hopefully someone from the Flink side can help 
give context where necessary :)

Oliver Swoboda wrote:
> Hello,
>
> I'm using Flink with Accumulo and wanted to read data from the database
> by using the createHadoopInput function. Therefore I configure an
> AccumuloInputFormat. The source code you can find here:
> https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
> <https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>
> I'm using a 5 Node Cluster (1 Master, 4 Worker).
> Accumulo is installed with Ambari and has 1 Master Server on the Master
> Node and 4 Tablet Servers (one on each Worker).
> Flink is installed standalone with the Jobmanager on the Master Node and
> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
> so there are 32 in total.
>
> First problem I have:
> If I start serveral Flink Jobs the client count for Zookeeper in the
> Accumulo Overview is constantly increasing. I assume that the used
> scanner isn't correctly closed. The client count only decreases to
> normal values when I restart Flink.

Hrm, this does seem rather bad. Eventually, you'll saturate the 
connections to ZK and ZK itself will start limiting new connections (per 
the maxClientCnxns property).

This sounds somewhat familiar to 
https://issues.apache.org/jira/browse/ACCUMULO-2113. The lack of a 
proper "close()" method on the Instance interface is a known deficiency. 
I'm not sure how Flink execution happens, so I am kind of just guessing.

You might be able to try to use the CleanUp[1] utility to close out the 
thread pools/connections when your Flink "task" is done.

> Second problem I have:
> I want to compare aggregations on time series data with Accumulo (with
> Iterators) and with flink. Unfortunately, the results vary inexplicable
> when I'm using Flink. I wanted to compare the results for a full table
> scan (called baseline in the code), but sometimes it takes 17-18 minutes
> and sometimes its between 30 and 60 minutes. In the longer case I can
> see in the Accumulo Overview that after some time only one worker is
> left with running scans and there are just a few entries/s sanned (4
> million at the beginning when all workers are running to 200k when the
> one worker is left). Because there are 2.5 billion records to scan and
> almost 500 million left it takes really long.
> This problem doesn't occur with Accumulo using Iterators and a batch
> scanner on the master node, each scan has almost identical durations and
> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
> are for each scan the same.

It sounds like maybe your partitioning was sub-optimal and caused one 
task to get a majority of the data? Having the autoAdjustRanges=true (as 
you do by default) should help get many batches of work based on the 
tablet boundaries in Accumulo. I'm not sure how Flink actually executes 
them though.

> Yours faithfully,
> Oliver Swoboda


[1] 
https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36