You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@drill.apache.org by Benjamin Schaff <be...@reactivecore.com> on 2020/01/21 22:59:55 UTC

Embedding Drill as a distributed query engine

Hi everyone,

I would like to see if you could provide some recommendations/help around
integrating Apache Drill as a distributed sql engine in a custom database.
Maybe I am going about it the wrong way so any feedback is appreciated.

What I would like to achieve, is to be able to embed drillbits into my
database node, it's a distributed database written mostly in scala so it's
running inside the jvm. As you would expect, each storage node holds a
partition of the data and I would like for each SubScan to be routed to the
drillbit instance embedded within the database node.

At this point, drillbits are running communicating properly with zk (I am
using zookeeper for the database also). I can connect to the Plugin I
created using sqlline and I can list schemas and tables. So basically, all
the metadata part is done and working.

I managed to build-up the patitionwork and affinity using the distributed
metadata off the database and I am stuck in the following situation.

If I override the "DistributionAffinity getDistributionAffinity()" method
to put it to "HARD", then I end up with having the following error:
"IllegalArgumentException: Sender fragment endpoint list should not be
empty", and the "applyAssignments" method of the GroupScan receives and
empty list of endpoints.

If I don't override it then node without "local access" get some work
scheduled.

I was wondering if there was a way to exclude drillbits to become a foreman.

Thanks in advance for any guidance.

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **

Re: Embedding Drill as a distributed query engine

Posted by Ted Dunning <te...@gmail.com>.
Benjamin,

I can't answer you precise question, but this is definitely a viable use
case. MapR does the same thing in the OJAI interface to MapR DB. The
significant difference is that the drill exec and the software handling the
db bits are segregated into separate processes and I think that there is
less account taken of precise data locality.

Some of the MapR engineers might comment.

On Tue, Jan 21, 2020 at 3:00 PM Benjamin Schaff <
benjamin.schaff@reactivecore.com> wrote:

> Hi everyone,
>
> I would like to see if you could provide some recommendations/help around
> integrating Apache Drill as a distributed sql engine in a custom database.
> Maybe I am going about it the wrong way so any feedback is appreciated.
>
> What I would like to achieve, is to be able to embed drillbits into my
> database node, it's a distributed database written mostly in scala so it's
> running inside the jvm. As you would expect, each storage node holds a
> partition of the data and I would like for each SubScan to be routed to the
> drillbit instance embedded within the database node.
>
> At this point, drillbits are running communicating properly with zk (I am
> using zookeeper for the database also). I can connect to the Plugin I
> created using sqlline and I can list schemas and tables. So basically, all
> the metadata part is done and working.
>
> I managed to build-up the patitionwork and affinity using the distributed
> metadata off the database and I am stuck in the following situation.
>
> If I override the "DistributionAffinity getDistributionAffinity()" method
> to put it to "HARD", then I end up with having the following error:
> "IllegalArgumentException: Sender fragment endpoint list should not be
> empty", and the "applyAssignments" method of the GroupScan receives and
> empty list of endpoints.
>
> If I don't override it then node without "local access" get some work
> scheduled.
>
> I was wondering if there was a way to exclude drillbits to become a
> foreman.
>
> Thanks in advance for any guidance.
>
> --
> *This e-mail and any
> attachments may contain confidential information and
> is intended for use solely
> by the addressee(s).  If you are not the
>
> intended recipient of this e-mail, please be aware that any dissemination,
>
> distribution, copying, or other use of the e-mail in whole or in part, is
>
> strictly prohibited.  If you have
> received this e-mail in error, please
> notify the sender and permanently delete
> the original and all copies of the
> e-mail, attachments, and any printouts. * **
>

Re: Embedding Drill as a distributed query engine

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Hi Benjamin,

Your comments all make perfect sense. Using Spark is a great idea for long-running jobs, or those that need code, such as ML and so on. Drill works best for tasks that can be expressed in SQL. As you know, Spark needs to start a new JVM for each query so it can host job-specific byte codes. Drill, being SQL based, just compiles SQL to Java and executes all queries in the same JVM. Queries thus start faster and there is less overhead, which is what you want if you run many short-lived, SQL queries. (Where short-lived means roughly "less than a minute.") In short, Drill and Spark solve different problems; if you have both sets of problems, you'll need both tools at your disposal.]

A bit more about Drill. Drill does use Calcite. But, Drill is not a simple data layer on top of Calcite. Drill is a fully distributed, general-purpose big data engine, like Presto, Spark or Impala. Both Drill and Spark create multi-level DAGs.

When possible, aggregation will be done locally, but often that is not possible. Consider a simple of sum of products sold, broken down by product:


SELECT sum(quantity) FROM prodSales
GROUP BY prodCode

Although the first-level aggregation can be done locally on each node, roll-up aggregation must be done by bringing like data together. Drill does this with hash partitioning: if you have N Drillbits, data from each of the Drillbits will be exchanged to the other N. with 1/N going from each sender to each receiver.

That way, aggregation is also distributed; there is no single "reducer" node. After aggregation is complete, all N Drillbits send their totals to the Foreman and then on to the client. So, a question would be if your DB nodes can handle the load of the exchanged data? Or, is all this shuffling better left to dedicated query nodes?


The above shows that Drill makes generous use of RPC to exchange data. Drill's RPC is of binary value vectors, and so is quite efficient. You mentioned your API is not performant. I wonder, are you using a compact binary format, or a more classic JSON-based REST API? In my experience, REST does not perform very well in this use case. The best RPC solution would be to copy binary data into a buffer, send it over the wire to Drill, and copy that binary data directly into vectors.

The next issue is data volume. If your queries are selective, you'll ship lots of data to Drill, only for Drill to discard it. A better solution is to implement "filter push-down" to reduce the amount of data read from your DB. Do you have indexes? Shards? Key/value pairs? In any of those cases, you can write code in Drill to push predicates to your DB, in the form your DB needs, so you reduce the volume of data sent to Drill (or Spark or whatever.)

The same is true of projection push-down; only ship off your DB node the columns that Drill wants. Many "NoSQL" databases offer some form of this "data access" API; perhaps your DB does also.


Apache Arrow is trying to do something similar. We've discussed having an Arrow integration in Drill, at least for reading data. Arrow might provide a useful filtering mechanism. But, unless your data is stored columnar, you'll pay the overhead of copying data to Arrow. On the Drill side, a copy in to Drill vectors is needed whether the data arrives in Arrow or some other format.

A clever (though non-trivial) solution would be to load data in your DB into Drill vectors, use Drill RPC to ship that to a Drillbit, where Drill could use the data without the need for a copy. (This is what Arrow promises, but Drill is not based on Arrow.) That way, you benefit from Drill's already-built data transfer mechanism.


You also mentioned that context switches are costly. Tricky to debug that via e-mail. Are you sending only one row at a time perhaps? If so, you really want to send decent-sized batches: 1K or 4K rows, up to some reasonable message size such as 1 MB. Since Drill (and presumably your DB) are multi-threaded, there will be thread-switches even if they are in the same process. Drill uses large record batches to minimize its own thread switches. Presto, Impala, Spark and Hive do the same.


So, maybe two options to consider:

1. A two-part solution in which your DB does the first-level projection/selection filtering and Drill does the remaining heavy lifting (joins, aggregations, etc.)

2. A library-based solution. I believe that the Calcite project does, in fact, have a simple data engine available as part of the project. I recall seeing some demo that reads CSV files (check out the Docs page). If you only need simple queries, and want to use Calcite, maybe this is an alternative. It won't scale like Drill (or Spark), but depending on your use case, it might be worth a look; if only to contrast with Drill.


Thanks,
- Paul

 

    On Tuesday, January 21, 2020, 5:58:05 PM PST, Benjamin Schaff <be...@reactivecore.com> wrote:  
 
 Thanks you all for your quick answers that's amazing responsiveness.

Let me address things the best I could since it's a bit complex so you
could provide me with some feedback.

The current SQL engine is based on spark SQL and is embedded a little bit
better than a custom datasource so I am fully aware of memory constraints
and everything related to it.

The issue I have with that model is the context switch between spark
internal format and our rpc layer that is really costly for a still unclear
reason. Using calcite directly on a node to query just one partition gave
us really good performance with our rpc endpoint so I wanted to embed drill
in the same JVM (which you can't do with spark because the executors are
forked on the fly) was to avoid that context switching. If nobody think
it's wise, feasible or easy I guess I could try the same integration as
spark but also providing a shared memory API.

I also wanted to leverage embedded mode in the hope that partial
aggregations would be executed close to the data avoiding a lot of
shuffling.

What are your thoughts ?

Thanks in advance.

Le mar. 21 janv. 2020 20 h 10, Paul Rogers <pa...@yahoo.com.invalid> a
écrit :

> Hi Benjamin,
>
> Very cool project! Drill works well on top of custom data sources.
>
> That said, I suspect that actually running Drill inside your process will
> lead to a large amount of complexity. Your comment focuses on code issues.
> However, there are larger concerns. Although we think of Drill as a simple
> single-threaded, single node tool (when run in SqlLine or on a Mac), Drill
> is designed to be fully distributed.
>
> As queries get larger, you will find that Drill itself uses large amounts
> of memory and CPU to run a query quickly. (Imagine a join or sort of
> billions of rows from several tables.) Drill has its own memory management
> system to handle the large blocks of memory needed. Your DB also needs
> memory. You'd need a way to unify Drill's memory management with your own
> -- a daunting task.
>
> Grinding through billions of rows is CPU intensive. Drill manages its own
> thread and makes very liberal use of CPU. Your DB engine likely also has a
> threading model. Again, integrating the two is difficult. We could go on.
>
> In short, although Drill works well as a query engine on top of a custom
> data source; Drill itself is not designed to be a library included into
> your app process; it is designed to run as its own distributed set of
> processes running alongside your process.
>
> We could, of course, change the design, but that would be a bit of a big
> project because of the above issues. Might be interesting to think how
> you'd embed a distributed framework as a library in some host process. Not
> sure I've ever seen this done for any tool. (If anyone knows of an example,
> please let us know.)
>
>
> I wonder if there is a better solution. Run Drill alongside your DB on the
> same nodes. Have Drill then obtain data from your DB via an API. The quick
> & dirty solution is to use an RPC API. You can get fancy and use shared
> memory. A side benefit is that other tools can also use the API. For
> example, if you find you need Spark integration, it is easier to provide.
> (You can't, of course, run Spark in your DB process.)
>
> In this case, an "embedded solution" means that Drill is embedded in your
> app cluster (like ZK), not that it is embedded in your app process.
>
>
> In this way, you can tune Drill's memory and CPU usage separately from
> that of your engine, making the problem tractable. This model is, in fact,
> very similar to the traditional HDFS model in which both Drill and HDFS run
> on the same nodes. It is also similar to what MapR did with the MapR DB
> integration.
>
>
> Further, by separating the two, you can run Drill on its own nodes if you
> find your queries are getting larger and more expensive. That is, you can
> scale out be separating compute (Drill) from storage (your DB), allowing
> each to scale independently.
>
>
> And, of course, a failure in one engine (Drill or DB) won't take down the
> other if the two are in separate processes.
>
>
> In either case, your storage plugin needs to compute data locality. If
> your DB is distributed, then perhaps it has some scheme for distributing
> data: hash partitioning, range partitioning, or whatever. Somehow, if I
> have key 'x', I know to go to node Y to get that value. For example, in
> HDFS, Drill can distribute block scans to the node(s) with the blocks.
>
>
> Or, maybe data is randomly distributed, so that every scan must run
> against every DB node; in which case if you have N nodes, you'll run N
> scans and each will find whatever it happens to contain.
>
>
> If your DB has N nodes, then you need to distribute work to those nodes by
> telling Drill that the max parallelization (reported by the group scan) is
> N. Then, Drill will ask you for the SubScan for each of the N scans, and
> you can allocate work to those nodes. Either by subsetting the scan (as in
> HDFS) or just running the same scan everywhere.
>
>
> If you go with the two-process model, then your storage plugin can use
> soft affinity: run the scan on the node that has your DB, else run it on
> any node and use an RPC to obtain the data. This is how Drill works if it
> runs on a subset of HDFS nodes.
>
> You also asked about the Foreman. At present, Drill assumes nodes are
> homogeneous: all nodes evenly share work, including the work of the
> Foreman. Impala, for example, has added a feature to dedicate some nodes to
> be only coordinators (the equivalent of Drill's Foreman). Drill does not
> yet have that feature.
>
> Without the homogeneity assumption, Drill would need some kind of work
> scheduler to know to give less work to the Forman + Drillbit node and more
> work to the Drillbit-only nodes. Having Foreman-only nodes would keep
> things simpler. In your ase, such a Foreman would have to reside on a node
> other than one of your DB nodes to keep the DB nodes symmetrical.
>
>
> The above is a high-level survey of the challenges. We'd be happy to
> discuss specific issues as you refine your design.
>
>
> Thanks,
> - Paul
>
>
>
>    On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
> benjamin.schaff@reactivecore.com> wrote:
>
>  Hi everyone,
>
> I would like to see if you could provide some recommendations/help around
> integrating Apache Drill as a distributed sql engine in a custom database.
> Maybe I am going about it the wrong way so any feedback is appreciated.
>
> What I would like to achieve, is to be able to embed drillbits into my
> database node, it's a distributed database written mostly in scala so it's
> running inside the jvm. As you would expect, each storage node holds a
> partition of the data and I would like for each SubScan to be routed to the
> drillbit instance embedded within the database node.
>
> At this point, drillbits are running communicating properly with zk (I am
> using zookeeper for the database also). I can connect to the Plugin I
> created using sqlline and I can list schemas and tables. So basically, all
> the metadata part is done and working.
>
> I managed to build-up the patitionwork and affinity using the distributed
> metadata off the database and I am stuck in the following situation.
>
> If I override the "DistributionAffinity getDistributionAffinity()" method
> to put it to "HARD", then I end up with having the following error:
> "IllegalArgumentException: Sender fragment endpoint list should not be
> empty", and the "applyAssignments" method of the GroupScan receives and
> empty list of endpoints.
>
> If I don't override it then node without "local access" get some work
> scheduled.
>
> I was wondering if there was a way to exclude drillbits to become a
> foreman.
>
> Thanks in advance for any guidance.
>
> --
> *This e-mail and any
> attachments may contain confidential information and
> is intended for use solely
> by the addressee(s).  If you are not the
>
> intended recipient of this e-mail, please be aware that any dissemination,
>
> distribution, copying, or other use of the e-mail in whole or in part, is
>
> strictly prohibited.  If you have
> received this e-mail in error, please
> notify the sender and permanently delete
> the original and all copies of the
> e-mail, attachments, and any printouts. * **

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **  

Re: Embedding Drill as a distributed query engine

Posted by Benjamin Schaff <be...@reactivecore.com>.
Thanks you all for your quick answers that's amazing responsiveness.

Let me address things the best I could since it's a bit complex so you
could provide me with some feedback.

The current SQL engine is based on spark SQL and is embedded a little bit
better than a custom datasource so I am fully aware of memory constraints
and everything related to it.

The issue I have with that model is the context switch between spark
internal format and our rpc layer that is really costly for a still unclear
reason. Using calcite directly on a node to query just one partition gave
us really good performance with our rpc endpoint so I wanted to embed drill
in the same JVM (which you can't do with spark because the executors are
forked on the fly) was to avoid that context switching. If nobody think
it's wise, feasible or easy I guess I could try the same integration as
spark but also providing a shared memory API.

I also wanted to leverage embedded mode in the hope that partial
aggregations would be executed close to the data avoiding a lot of
shuffling.

What are your thoughts ?

Thanks in advance.

Le mar. 21 janv. 2020 20 h 10, Paul Rogers <pa...@yahoo.com.invalid> a
écrit :

> Hi Benjamin,
>
> Very cool project! Drill works well on top of custom data sources.
>
> That said, I suspect that actually running Drill inside your process will
> lead to a large amount of complexity. Your comment focuses on code issues.
> However, there are larger concerns. Although we think of Drill as a simple
> single-threaded, single node tool (when run in SqlLine or on a Mac), Drill
> is designed to be fully distributed.
>
> As queries get larger, you will find that Drill itself uses large amounts
> of memory and CPU to run a query quickly. (Imagine a join or sort of
> billions of rows from several tables.) Drill has its own memory management
> system to handle the large blocks of memory needed. Your DB also needs
> memory. You'd need a way to unify Drill's memory management with your own
> -- a daunting task.
>
> Grinding through billions of rows is CPU intensive. Drill manages its own
> thread and makes very liberal use of CPU. Your DB engine likely also has a
> threading model. Again, integrating the two is difficult. We could go on.
>
> In short, although Drill works well as a query engine on top of a custom
> data source; Drill itself is not designed to be a library included into
> your app process; it is designed to run as its own distributed set of
> processes running alongside your process.
>
> We could, of course, change the design, but that would be a bit of a big
> project because of the above issues. Might be interesting to think how
> you'd embed a distributed framework as a library in some host process. Not
> sure I've ever seen this done for any tool. (If anyone knows of an example,
> please let us know.)
>
>
> I wonder if there is a better solution. Run Drill alongside your DB on the
> same nodes. Have Drill then obtain data from your DB via an API. The quick
> & dirty solution is to use an RPC API. You can get fancy and use shared
> memory. A side benefit is that other tools can also use the API. For
> example, if you find you need Spark integration, it is easier to provide.
> (You can't, of course, run Spark in your DB process.)
>
> In this case, an "embedded solution" means that Drill is embedded in your
> app cluster (like ZK), not that it is embedded in your app process.
>
>
> In this way, you can tune Drill's memory and CPU usage separately from
> that of your engine, making the problem tractable. This model is, in fact,
> very similar to the traditional HDFS model in which both Drill and HDFS run
> on the same nodes. It is also similar to what MapR did with the MapR DB
> integration.
>
>
> Further, by separating the two, you can run Drill on its own nodes if you
> find your queries are getting larger and more expensive. That is, you can
> scale out be separating compute (Drill) from storage (your DB), allowing
> each to scale independently.
>
>
> And, of course, a failure in one engine (Drill or DB) won't take down the
> other if the two are in separate processes.
>
>
> In either case, your storage plugin needs to compute data locality. If
> your DB is distributed, then perhaps it has some scheme for distributing
> data: hash partitioning, range partitioning, or whatever. Somehow, if I
> have key 'x', I know to go to node Y to get that value. For example, in
> HDFS, Drill can distribute block scans to the node(s) with the blocks.
>
>
> Or, maybe data is randomly distributed, so that every scan must run
> against every DB node; in which case if you have N nodes, you'll run N
> scans and each will find whatever it happens to contain.
>
>
> If your DB has N nodes, then you need to distribute work to those nodes by
> telling Drill that the max parallelization (reported by the group scan) is
> N. Then, Drill will ask you for the SubScan for each of the N scans, and
> you can allocate work to those nodes. Either by subsetting the scan (as in
> HDFS) or just running the same scan everywhere.
>
>
> If you go with the two-process model, then your storage plugin can use
> soft affinity: run the scan on the node that has your DB, else run it on
> any node and use an RPC to obtain the data. This is how Drill works if it
> runs on a subset of HDFS nodes.
>
> You also asked about the Foreman. At present, Drill assumes nodes are
> homogeneous: all nodes evenly share work, including the work of the
> Foreman. Impala, for example, has added a feature to dedicate some nodes to
> be only coordinators (the equivalent of Drill's Foreman). Drill does not
> yet have that feature.
>
> Without the homogeneity assumption, Drill would need some kind of work
> scheduler to know to give less work to the Forman + Drillbit node and more
> work to the Drillbit-only nodes. Having Foreman-only nodes would keep
> things simpler. In your ase, such a Foreman would have to reside on a node
> other than one of your DB nodes to keep the DB nodes symmetrical.
>
>
> The above is a high-level survey of the challenges. We'd be happy to
> discuss specific issues as you refine your design.
>
>
> Thanks,
> - Paul
>
>
>
>     On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
> benjamin.schaff@reactivecore.com> wrote:
>
>  Hi everyone,
>
> I would like to see if you could provide some recommendations/help around
> integrating Apache Drill as a distributed sql engine in a custom database.
> Maybe I am going about it the wrong way so any feedback is appreciated.
>
> What I would like to achieve, is to be able to embed drillbits into my
> database node, it's a distributed database written mostly in scala so it's
> running inside the jvm. As you would expect, each storage node holds a
> partition of the data and I would like for each SubScan to be routed to the
> drillbit instance embedded within the database node.
>
> At this point, drillbits are running communicating properly with zk (I am
> using zookeeper for the database also). I can connect to the Plugin I
> created using sqlline and I can list schemas and tables. So basically, all
> the metadata part is done and working.
>
> I managed to build-up the patitionwork and affinity using the distributed
> metadata off the database and I am stuck in the following situation.
>
> If I override the "DistributionAffinity getDistributionAffinity()" method
> to put it to "HARD", then I end up with having the following error:
> "IllegalArgumentException: Sender fragment endpoint list should not be
> empty", and the "applyAssignments" method of the GroupScan receives and
> empty list of endpoints.
>
> If I don't override it then node without "local access" get some work
> scheduled.
>
> I was wondering if there was a way to exclude drillbits to become a
> foreman.
>
> Thanks in advance for any guidance.
>
> --
> *This e-mail and any
> attachments may contain confidential information and
> is intended for use solely
> by the addressee(s).  If you are not the
>
> intended recipient of this e-mail, please be aware that any dissemination,
>
> distribution, copying, or other use of the e-mail in whole or in part, is
>
> strictly prohibited.  If you have
> received this e-mail in error, please
> notify the sender and permanently delete
> the original and all copies of the
> e-mail, attachments, and any printouts. * **

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **

Re: Embedding Drill as a distributed query engine

Posted by Benjamin Schaff <be...@reactivecore.com>.
Hi,

Yeah as a side note, I had to fork Drill to update the netty jars also so
that's done in my fork as best as my understanding is.
There is already a ticket about that issue:
https://issues.apache.org/jira/browse/DRILL-7546
That seems to be the same problem described there.

Thanks, I hope I will be able to open source it when it's more production
ready and the company is able to do it.

On Tue, Mar 24, 2020 at 2:41 PM Paul Rogers <pa...@yahoo.com.invalid>
wrote:

> Hi Benjamin,
>
> Thanks much for the update. Congrats on getting everything working. Glad
> you did not end up in jar conflict hell.
>
> If you can, please file a JIRA ticket about the endpoint map issue. The
> situation you describe  seems wrong. Perhaps the bug crept in as people
> added information to the endpoint. In any event, we should fix the issue.
>
> I look forward to the link to see how you did the integration.
>
> Thanks,
> - Paul
>
>
>
>     On Tuesday, March 24, 2020, 6:17:41 AM PDT, Benjamin Schaff <
> benjamin.schaff@reactivecore.com> wrote:
>
>  Hi,
>
> Just wanted to give some feedback, I finally got a chance to work on that
> part of our database.
> I successfully integrated drill as the sql engine directly inside the
> database with partition placement that gave me the lift I expected from the
> old version that was using an external spark cluster.
> So right now, the "storage" nodes and drillbits are in the same JVM and can
> read the data directly.
> I am not seeing any issue as of now but I am continuing the benchmarks and
> try to have better coverage for testing.
>
> One thing that is a bit of struggle and I fixed it partially in Drill was
> the fact that DrillbitEndpoints are used in HashMap and the hashcode
> contains the state field of the node which sometimes ends up duplicating
> the endpoints and gave me some issues with the "hard" affinity mode and the
> "required" endpoint flag.
> Unfortunately, since I don't know the internal of Drill, the patch I did is
> really just for my use case but if it's of any interest, I could contribute
> fixing that properly. (endpoint with startup state get mixed up with
> endpoints with online state and are one and the same)
>
> My company is considering open-sourcing our product, if any one is
> interested, I will give the link whenever it's available as an example on
> how to do it.
>
> Thanks everyone for you help and suggestions.
>
>
> On Wed, Jan 22, 2020 at 9:28 AM Benjamin Schaff <
> benjamin.schaff@reactivecore.com> wrote:
>
> > Hi, thanks everyone for the feedback.
> >
> > The current database query API support pushdowns (filtering and
> > projections) but when dealing with billion rows, it's still a lot to move
> > over the network.
> > The RPC API itself is not the performance bottleneck, we have our own
> > binary format similar to flatbuffer with query time codegen readers and
> > writers so that part is ok.
> >
> > On the question about why the spark part is kind of slow, I do batch
> > (usually around 50k rows at a time) but my guess is that going from our
> > binary format to spark internal row format and then spark moves it to
> > unsaferow is a lot of transformation for "nothing".
> > We have a codegen parser that does internal format to spark row format
> but
> > directly speaking unsaferow is much more involved so I put it on the side
> > for now.
> >
> > Here is what I am going to try from all the feedback you gave me:
> > 1) Since premature optimization is the root of evil, and my spark
> > assumption might not hold true for Drill, I will try to do a "remote"
> > integration
> > 2) I will try to see if I can use Drill internal format to ship it on the
> > network, if anybody could be kind enough to give me a pointer where to
> look
> > that would be awesome
> > 3) I will upgrade my current integration to merge the "remote" with the
> > "local" one
> >
> > I will keep you guys updated and publish my results so that I can give
> > back some of my experiments.
> >
> > On a separate note, I was wondering if/how it was possible for Drill
> > (probably hacking somewhere in calcite plan, to push down the joins
> filter
> > parts or if it is done automatically)
> >
> > Again, any idea or comment is welcome.
> >
> > Thanks.
> >
> > On Wed, Jan 22, 2020 at 1:28 AM Ted Dunning <te...@gmail.com>
> wrote:
> >
> >> Hmmm....
> >>
> >> I disagree with a lot of what Paul says.
> >>
> >> Here is where I agree fully:
> >>
> >> 1) collocating processes in the same JVM increases the blast radius of
> >> failures. If either the DB or the Drill threads go south, it will take
> the
> >> other out. This is a relatively low probability event, but increasing
> the
> >> probability, or, worse, coupling the probabilities isn't necessary. On a
> >> very closely related note, the blast radius of GC is also coupled
> between
> >> the two processes.
> >>
> >> 2) lack of control over either process or memory for either process will
> >> affect the other. That would be bad. See (1).
> >>
> >> 3) coupled scaling is sub-optimal. But that might be compensated for by
> >> the
> >> close coupling of within process communication.
> >>
> >> Where I disagree is how serious these considerations are. Drill is
> fairly
> >> well disciplined in terms of heap and off-heap space. Presumably the DB
> is
> >> as well. That would mean that the likely impact of (2) would be very
> >> small.
> >> The ease of communication between threads within the same process is
> >> dramatically better than communication between processes, even
> >> (especially?) with shared memory.
> >>
> >> My own recommendation would be to *allow* collocation but not assume it.
> >> Allow for non-collocated Drill bits as well. That allows you to pivot at
> >> any point.
> >>
> >>
> >> On the other hand
> >>
> >> On Tue, Jan 21, 2020 at 5:10 PM Paul Rogers <pa...@yahoo.com.invalid>
> >> wrote:
> >>
> >> > Hi Benjamin,
> >> >
> >> > Very cool project! Drill works well on top of custom data sources.
> >> >
> >> > That said, I suspect that actually running Drill inside your process
> >> will
> >> > lead to a large amount of complexity. Your comment focuses on code
> >> issues.
> >> > However, there are larger concerns. Although we think of Drill as a
> >> simple
> >> > single-threaded, single node tool (when run in SqlLine or on a Mac),
> >> Drill
> >> > is designed to be fully distributed.
> >> >
> >> > As queries get larger, you will find that Drill itself uses large
> >> amounts
> >> > of memory and CPU to run a query quickly. (Imagine a join or sort of
> >> > billions of rows from several tables.) Drill has its own memory
> >> management
> >> > system to handle the large blocks of memory needed. Your DB also needs
> >> > memory. You'd need a way to unify Drill's memory management with your
> >> own
> >> > -- a daunting task.
> >> >
> >> > Grinding through billions of rows is CPU intensive. Drill manages its
> >> own
> >> > thread and makes very liberal use of CPU. Your DB engine likely also
> >> has a
> >> > threading model. Again, integrating the two is difficult. We could go
> >> on.
> >> >
> >> > In short, although Drill works well as a query engine on top of a
> custom
> >> > data source; Drill itself is not designed to be a library included
> into
> >> > your app process; it is designed to run as its own distributed set of
> >> > processes running alongside your process.
> >> >
> >> > We could, of course, change the design, but that would be a bit of a
> big
> >> > project because of the above issues. Might be interesting to think how
> >> > you'd embed a distributed framework as a library in some host process.
> >> Not
> >> > sure I've ever seen this done for any tool. (If anyone knows of an
> >> example,
> >> > please let us know.)
> >> >
> >> >
> >> > I wonder if there is a better solution. Run Drill alongside your DB on
> >> the
> >> > same nodes. Have Drill then obtain data from your DB via an API. The
> >> quick
> >> > & dirty solution is to use an RPC API. You can get fancy and use
> shared
> >> > memory. A side benefit is that other tools can also use the API. For
> >> > example, if you find you need Spark integration, it is easier to
> >> provide.
> >> > (You can't, of course, run Spark in your DB process.)
> >> >
> >> > In this case, an "embedded solution" means that Drill is embedded in
> >> your
> >> > app cluster (like ZK), not that it is embedded in your app process.
> >> >
> >> >
> >> > In this way, you can tune Drill's memory and CPU usage separately from
> >> > that of your engine, making the problem tractable. This model is, in
> >> fact,
> >> > very similar to the traditional HDFS model in which both Drill and
> HDFS
> >> run
> >> > on the same nodes. It is also similar to what MapR did with the MapR
> DB
> >> > integration.
> >> >
> >> >
> >> > Further, by separating the two, you can run Drill on its own nodes if
> >> you
> >> > find your queries are getting larger and more expensive. That is, you
> >> can
> >> > scale out be separating compute (Drill) from storage (your DB),
> allowing
> >> > each to scale independently.
> >> >
> >> >
> >> > And, of course, a failure in one engine (Drill or DB) won't take down
> >> the
> >> > other if the two are in separate processes.
> >> >
> >> >
> >> > In either case, your storage plugin needs to compute data locality. If
> >> > your DB is distributed, then perhaps it has some scheme for
> distributing
> >> > data: hash partitioning, range partitioning, or whatever. Somehow, if
> I
> >> > have key 'x', I know to go to node Y to get that value. For example,
> in
> >> > HDFS, Drill can distribute block scans to the node(s) with the blocks.
> >> >
> >> >
> >> > Or, maybe data is randomly distributed, so that every scan must run
> >> > against every DB node; in which case if you have N nodes, you'll run N
> >> > scans and each will find whatever it happens to contain.
> >> >
> >> >
> >> > If your DB has N nodes, then you need to distribute work to those
> nodes
> >> by
> >> > telling Drill that the max parallelization (reported by the group
> scan)
> >> is
> >> > N. Then, Drill will ask you for the SubScan for each of the N scans,
> and
> >> > you can allocate work to those nodes. Either by subsetting the scan
> (as
> >> in
> >> > HDFS) or just running the same scan everywhere.
> >> >
> >> >
> >> > If you go with the two-process model, then your storage plugin can use
> >> > soft affinity: run the scan on the node that has your DB, else run it
> on
> >> > any node and use an RPC to obtain the data. This is how Drill works if
> >> it
> >> > runs on a subset of HDFS nodes.
> >> >
> >> > You also asked about the Foreman. At present, Drill assumes nodes are
> >> > homogeneous: all nodes evenly share work, including the work of the
> >> > Foreman. Impala, for example, has added a feature to dedicate some
> >> nodes to
> >> > be only coordinators (the equivalent of Drill's Foreman). Drill does
> not
> >> > yet have that feature.
> >> >
> >> > Without the homogeneity assumption, Drill would need some kind of work
> >> > scheduler to know to give less work to the Forman + Drillbit node and
> >> more
> >> > work to the Drillbit-only nodes. Having Foreman-only nodes would keep
> >> > things simpler. In your ase, such a Foreman would have to reside on a
> >> node
> >> > other than one of your DB nodes to keep the DB nodes symmetrical.
> >> >
> >> >
> >> > The above is a high-level survey of the challenges. We'd be happy to
> >> > discuss specific issues as you refine your design.
> >> >
> >> >
> >> > Thanks,
> >> > - Paul
> >> >
> >> >
> >> >
> >> >    On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
> >> > benjamin.schaff@reactivecore.com> wrote:
> >> >
> >> >  Hi everyone,
> >> >
> >> > I would like to see if you could provide some recommendations/help
> >> around
> >> > integrating Apache Drill as a distributed sql engine in a custom
> >> database.
> >> > Maybe I am going about it the wrong way so any feedback is
> appreciated.
> >> >
> >> > What I would like to achieve, is to be able to embed drillbits into my
> >> > database node, it's a distributed database written mostly in scala so
> >> it's
> >> > running inside the jvm. As you would expect, each storage node holds a
> >> > partition of the data and I would like for each SubScan to be routed
> to
> >> the
> >> > drillbit instance embedded within the database node.
> >> >
> >> > At this point, drillbits are running communicating properly with zk (I
> >> am
> >> > using zookeeper for the database also). I can connect to the Plugin I
> >> > created using sqlline and I can list schemas and tables. So basically,
> >> all
> >> > the metadata part is done and working.
> >> >
> >> > I managed to build-up the patitionwork and affinity using the
> >> distributed
> >> > metadata off the database and I am stuck in the following situation.
> >> >
> >> > If I override the "DistributionAffinity getDistributionAffinity()"
> >> method
> >> > to put it to "HARD", then I end up with having the following error:
> >> > "IllegalArgumentException: Sender fragment endpoint list should not be
> >> > empty", and the "applyAssignments" method of the GroupScan receives
> and
> >> > empty list of endpoints.
> >> >
> >> > If I don't override it then node without "local access" get some work
> >> > scheduled.
> >> >
> >> > I was wondering if there was a way to exclude drillbits to become a
> >> > foreman.
> >> >
> >> > Thanks in advance for any guidance.
> >> >
> >> > --
> >> > *This e-mail and any
> >> > attachments may contain confidential information and
> >> > is intended for use solely
> >> > by the addressee(s).  If you are not the
> >> >
> >> > intended recipient of this e-mail, please be aware that any
> >> dissemination,
> >> >
> >> > distribution, copying, or other use of the e-mail in whole or in part,
> >> is
> >> >
> >> > strictly prohibited.  If you have
> >> > received this e-mail in error, please
> >> > notify the sender and permanently delete
> >> > the original and all copies of the
> >> > e-mail, attachments, and any printouts. * **
> >>
> >
>
> --
> *This e-mail and any
> attachments may contain confidential information and
> is intended for use solely
> by the addressee(s).  If you are not the
>
> intended recipient of this e-mail, please be aware that any dissemination,
>
> distribution, copying, or other use of the e-mail in whole or in part, is
>
> strictly prohibited.  If you have
> received this e-mail in error, please
> notify the sender and permanently delete
> the original and all copies of the
> e-mail, attachments, and any printouts. * **

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **

Re: Embedding Drill as a distributed query engine

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Hi Benjamin,

Thanks much for the update. Congrats on getting everything working. Glad you did not end up in jar conflict hell.

If you can, please file a JIRA ticket about the endpoint map issue. The situation you describe  seems wrong. Perhaps the bug crept in as people added information to the endpoint. In any event, we should fix the issue.

I look forward to the link to see how you did the integration.

Thanks,
- Paul

 

    On Tuesday, March 24, 2020, 6:17:41 AM PDT, Benjamin Schaff <be...@reactivecore.com> wrote:  
 
 Hi,

Just wanted to give some feedback, I finally got a chance to work on that
part of our database.
I successfully integrated drill as the sql engine directly inside the
database with partition placement that gave me the lift I expected from the
old version that was using an external spark cluster.
So right now, the "storage" nodes and drillbits are in the same JVM and can
read the data directly.
I am not seeing any issue as of now but I am continuing the benchmarks and
try to have better coverage for testing.

One thing that is a bit of struggle and I fixed it partially in Drill was
the fact that DrillbitEndpoints are used in HashMap and the hashcode
contains the state field of the node which sometimes ends up duplicating
the endpoints and gave me some issues with the "hard" affinity mode and the
"required" endpoint flag.
Unfortunately, since I don't know the internal of Drill, the patch I did is
really just for my use case but if it's of any interest, I could contribute
fixing that properly. (endpoint with startup state get mixed up with
endpoints with online state and are one and the same)

My company is considering open-sourcing our product, if any one is
interested, I will give the link whenever it's available as an example on
how to do it.

Thanks everyone for you help and suggestions.


On Wed, Jan 22, 2020 at 9:28 AM Benjamin Schaff <
benjamin.schaff@reactivecore.com> wrote:

> Hi, thanks everyone for the feedback.
>
> The current database query API support pushdowns (filtering and
> projections) but when dealing with billion rows, it's still a lot to move
> over the network.
> The RPC API itself is not the performance bottleneck, we have our own
> binary format similar to flatbuffer with query time codegen readers and
> writers so that part is ok.
>
> On the question about why the spark part is kind of slow, I do batch
> (usually around 50k rows at a time) but my guess is that going from our
> binary format to spark internal row format and then spark moves it to
> unsaferow is a lot of transformation for "nothing".
> We have a codegen parser that does internal format to spark row format but
> directly speaking unsaferow is much more involved so I put it on the side
> for now.
>
> Here is what I am going to try from all the feedback you gave me:
> 1) Since premature optimization is the root of evil, and my spark
> assumption might not hold true for Drill, I will try to do a "remote"
> integration
> 2) I will try to see if I can use Drill internal format to ship it on the
> network, if anybody could be kind enough to give me a pointer where to look
> that would be awesome
> 3) I will upgrade my current integration to merge the "remote" with the
> "local" one
>
> I will keep you guys updated and publish my results so that I can give
> back some of my experiments.
>
> On a separate note, I was wondering if/how it was possible for Drill
> (probably hacking somewhere in calcite plan, to push down the joins filter
> parts or if it is done automatically)
>
> Again, any idea or comment is welcome.
>
> Thanks.
>
> On Wed, Jan 22, 2020 at 1:28 AM Ted Dunning <te...@gmail.com> wrote:
>
>> Hmmm....
>>
>> I disagree with a lot of what Paul says.
>>
>> Here is where I agree fully:
>>
>> 1) collocating processes in the same JVM increases the blast radius of
>> failures. If either the DB or the Drill threads go south, it will take the
>> other out. This is a relatively low probability event, but increasing the
>> probability, or, worse, coupling the probabilities isn't necessary. On a
>> very closely related note, the blast radius of GC is also coupled between
>> the two processes.
>>
>> 2) lack of control over either process or memory for either process will
>> affect the other. That would be bad. See (1).
>>
>> 3) coupled scaling is sub-optimal. But that might be compensated for by
>> the
>> close coupling of within process communication.
>>
>> Where I disagree is how serious these considerations are. Drill is fairly
>> well disciplined in terms of heap and off-heap space. Presumably the DB is
>> as well. That would mean that the likely impact of (2) would be very
>> small.
>> The ease of communication between threads within the same process is
>> dramatically better than communication between processes, even
>> (especially?) with shared memory.
>>
>> My own recommendation would be to *allow* collocation but not assume it.
>> Allow for non-collocated Drill bits as well. That allows you to pivot at
>> any point.
>>
>>
>> On the other hand
>>
>> On Tue, Jan 21, 2020 at 5:10 PM Paul Rogers <pa...@yahoo.com.invalid>
>> wrote:
>>
>> > Hi Benjamin,
>> >
>> > Very cool project! Drill works well on top of custom data sources.
>> >
>> > That said, I suspect that actually running Drill inside your process
>> will
>> > lead to a large amount of complexity. Your comment focuses on code
>> issues.
>> > However, there are larger concerns. Although we think of Drill as a
>> simple
>> > single-threaded, single node tool (when run in SqlLine or on a Mac),
>> Drill
>> > is designed to be fully distributed.
>> >
>> > As queries get larger, you will find that Drill itself uses large
>> amounts
>> > of memory and CPU to run a query quickly. (Imagine a join or sort of
>> > billions of rows from several tables.) Drill has its own memory
>> management
>> > system to handle the large blocks of memory needed. Your DB also needs
>> > memory. You'd need a way to unify Drill's memory management with your
>> own
>> > -- a daunting task.
>> >
>> > Grinding through billions of rows is CPU intensive. Drill manages its
>> own
>> > thread and makes very liberal use of CPU. Your DB engine likely also
>> has a
>> > threading model. Again, integrating the two is difficult. We could go
>> on.
>> >
>> > In short, although Drill works well as a query engine on top of a custom
>> > data source; Drill itself is not designed to be a library included into
>> > your app process; it is designed to run as its own distributed set of
>> > processes running alongside your process.
>> >
>> > We could, of course, change the design, but that would be a bit of a big
>> > project because of the above issues. Might be interesting to think how
>> > you'd embed a distributed framework as a library in some host process.
>> Not
>> > sure I've ever seen this done for any tool. (If anyone knows of an
>> example,
>> > please let us know.)
>> >
>> >
>> > I wonder if there is a better solution. Run Drill alongside your DB on
>> the
>> > same nodes. Have Drill then obtain data from your DB via an API. The
>> quick
>> > & dirty solution is to use an RPC API. You can get fancy and use shared
>> > memory. A side benefit is that other tools can also use the API. For
>> > example, if you find you need Spark integration, it is easier to
>> provide.
>> > (You can't, of course, run Spark in your DB process.)
>> >
>> > In this case, an "embedded solution" means that Drill is embedded in
>> your
>> > app cluster (like ZK), not that it is embedded in your app process.
>> >
>> >
>> > In this way, you can tune Drill's memory and CPU usage separately from
>> > that of your engine, making the problem tractable. This model is, in
>> fact,
>> > very similar to the traditional HDFS model in which both Drill and HDFS
>> run
>> > on the same nodes. It is also similar to what MapR did with the MapR DB
>> > integration.
>> >
>> >
>> > Further, by separating the two, you can run Drill on its own nodes if
>> you
>> > find your queries are getting larger and more expensive. That is, you
>> can
>> > scale out be separating compute (Drill) from storage (your DB), allowing
>> > each to scale independently.
>> >
>> >
>> > And, of course, a failure in one engine (Drill or DB) won't take down
>> the
>> > other if the two are in separate processes.
>> >
>> >
>> > In either case, your storage plugin needs to compute data locality. If
>> > your DB is distributed, then perhaps it has some scheme for distributing
>> > data: hash partitioning, range partitioning, or whatever. Somehow, if I
>> > have key 'x', I know to go to node Y to get that value. For example, in
>> > HDFS, Drill can distribute block scans to the node(s) with the blocks.
>> >
>> >
>> > Or, maybe data is randomly distributed, so that every scan must run
>> > against every DB node; in which case if you have N nodes, you'll run N
>> > scans and each will find whatever it happens to contain.
>> >
>> >
>> > If your DB has N nodes, then you need to distribute work to those nodes
>> by
>> > telling Drill that the max parallelization (reported by the group scan)
>> is
>> > N. Then, Drill will ask you for the SubScan for each of the N scans, and
>> > you can allocate work to those nodes. Either by subsetting the scan (as
>> in
>> > HDFS) or just running the same scan everywhere.
>> >
>> >
>> > If you go with the two-process model, then your storage plugin can use
>> > soft affinity: run the scan on the node that has your DB, else run it on
>> > any node and use an RPC to obtain the data. This is how Drill works if
>> it
>> > runs on a subset of HDFS nodes.
>> >
>> > You also asked about the Foreman. At present, Drill assumes nodes are
>> > homogeneous: all nodes evenly share work, including the work of the
>> > Foreman. Impala, for example, has added a feature to dedicate some
>> nodes to
>> > be only coordinators (the equivalent of Drill's Foreman). Drill does not
>> > yet have that feature.
>> >
>> > Without the homogeneity assumption, Drill would need some kind of work
>> > scheduler to know to give less work to the Forman + Drillbit node and
>> more
>> > work to the Drillbit-only nodes. Having Foreman-only nodes would keep
>> > things simpler. In your ase, such a Foreman would have to reside on a
>> node
>> > other than one of your DB nodes to keep the DB nodes symmetrical.
>> >
>> >
>> > The above is a high-level survey of the challenges. We'd be happy to
>> > discuss specific issues as you refine your design.
>> >
>> >
>> > Thanks,
>> > - Paul
>> >
>> >
>> >
>> >    On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
>> > benjamin.schaff@reactivecore.com> wrote:
>> >
>> >  Hi everyone,
>> >
>> > I would like to see if you could provide some recommendations/help
>> around
>> > integrating Apache Drill as a distributed sql engine in a custom
>> database.
>> > Maybe I am going about it the wrong way so any feedback is appreciated.
>> >
>> > What I would like to achieve, is to be able to embed drillbits into my
>> > database node, it's a distributed database written mostly in scala so
>> it's
>> > running inside the jvm. As you would expect, each storage node holds a
>> > partition of the data and I would like for each SubScan to be routed to
>> the
>> > drillbit instance embedded within the database node.
>> >
>> > At this point, drillbits are running communicating properly with zk (I
>> am
>> > using zookeeper for the database also). I can connect to the Plugin I
>> > created using sqlline and I can list schemas and tables. So basically,
>> all
>> > the metadata part is done and working.
>> >
>> > I managed to build-up the patitionwork and affinity using the
>> distributed
>> > metadata off the database and I am stuck in the following situation.
>> >
>> > If I override the "DistributionAffinity getDistributionAffinity()"
>> method
>> > to put it to "HARD", then I end up with having the following error:
>> > "IllegalArgumentException: Sender fragment endpoint list should not be
>> > empty", and the "applyAssignments" method of the GroupScan receives and
>> > empty list of endpoints.
>> >
>> > If I don't override it then node without "local access" get some work
>> > scheduled.
>> >
>> > I was wondering if there was a way to exclude drillbits to become a
>> > foreman.
>> >
>> > Thanks in advance for any guidance.
>> >
>> > --
>> > *This e-mail and any
>> > attachments may contain confidential information and
>> > is intended for use solely
>> > by the addressee(s).  If you are not the
>> >
>> > intended recipient of this e-mail, please be aware that any
>> dissemination,
>> >
>> > distribution, copying, or other use of the e-mail in whole or in part,
>> is
>> >
>> > strictly prohibited.  If you have
>> > received this e-mail in error, please
>> > notify the sender and permanently delete
>> > the original and all copies of the
>> > e-mail, attachments, and any printouts. * **
>>
>

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **  

Re: Embedding Drill as a distributed query engine

Posted by Benjamin Schaff <be...@reactivecore.com>.
Hi,

Just wanted to give some feedback, I finally got a chance to work on that
part of our database.
I successfully integrated drill as the sql engine directly inside the
database with partition placement that gave me the lift I expected from the
old version that was using an external spark cluster.
So right now, the "storage" nodes and drillbits are in the same JVM and can
read the data directly.
I am not seeing any issue as of now but I am continuing the benchmarks and
try to have better coverage for testing.

One thing that is a bit of struggle and I fixed it partially in Drill was
the fact that DrillbitEndpoints are used in HashMap and the hashcode
contains the state field of the node which sometimes ends up duplicating
the endpoints and gave me some issues with the "hard" affinity mode and the
"required" endpoint flag.
Unfortunately, since I don't know the internal of Drill, the patch I did is
really just for my use case but if it's of any interest, I could contribute
fixing that properly. (endpoint with startup state get mixed up with
endpoints with online state and are one and the same)

My company is considering open-sourcing our product, if any one is
interested, I will give the link whenever it's available as an example on
how to do it.

Thanks everyone for you help and suggestions.


On Wed, Jan 22, 2020 at 9:28 AM Benjamin Schaff <
benjamin.schaff@reactivecore.com> wrote:

> Hi, thanks everyone for the feedback.
>
> The current database query API support pushdowns (filtering and
> projections) but when dealing with billion rows, it's still a lot to move
> over the network.
> The RPC API itself is not the performance bottleneck, we have our own
> binary format similar to flatbuffer with query time codegen readers and
> writers so that part is ok.
>
> On the question about why the spark part is kind of slow, I do batch
> (usually around 50k rows at a time) but my guess is that going from our
> binary format to spark internal row format and then spark moves it to
> unsaferow is a lot of transformation for "nothing".
> We have a codegen parser that does internal format to spark row format but
> directly speaking unsaferow is much more involved so I put it on the side
> for now.
>
> Here is what I am going to try from all the feedback you gave me:
> 1) Since premature optimization is the root of evil, and my spark
> assumption might not hold true for Drill, I will try to do a "remote"
> integration
> 2) I will try to see if I can use Drill internal format to ship it on the
> network, if anybody could be kind enough to give me a pointer where to look
> that would be awesome
> 3) I will upgrade my current integration to merge the "remote" with the
> "local" one
>
> I will keep you guys updated and publish my results so that I can give
> back some of my experiments.
>
> On a separate note, I was wondering if/how it was possible for Drill
> (probably hacking somewhere in calcite plan, to push down the joins filter
> parts or if it is done automatically)
>
> Again, any idea or comment is welcome.
>
> Thanks.
>
> On Wed, Jan 22, 2020 at 1:28 AM Ted Dunning <te...@gmail.com> wrote:
>
>> Hmmm....
>>
>> I disagree with a lot of what Paul says.
>>
>> Here is where I agree fully:
>>
>> 1) collocating processes in the same JVM increases the blast radius of
>> failures. If either the DB or the Drill threads go south, it will take the
>> other out. This is a relatively low probability event, but increasing the
>> probability, or, worse, coupling the probabilities isn't necessary. On a
>> very closely related note, the blast radius of GC is also coupled between
>> the two processes.
>>
>> 2) lack of control over either process or memory for either process will
>> affect the other. That would be bad. See (1).
>>
>> 3) coupled scaling is sub-optimal. But that might be compensated for by
>> the
>> close coupling of within process communication.
>>
>> Where I disagree is how serious these considerations are. Drill is fairly
>> well disciplined in terms of heap and off-heap space. Presumably the DB is
>> as well. That would mean that the likely impact of (2) would be very
>> small.
>> The ease of communication between threads within the same process is
>> dramatically better than communication between processes, even
>> (especially?) with shared memory.
>>
>> My own recommendation would be to *allow* collocation but not assume it.
>> Allow for non-collocated Drill bits as well. That allows you to pivot at
>> any point.
>>
>>
>> On the other hand
>>
>> On Tue, Jan 21, 2020 at 5:10 PM Paul Rogers <pa...@yahoo.com.invalid>
>> wrote:
>>
>> > Hi Benjamin,
>> >
>> > Very cool project! Drill works well on top of custom data sources.
>> >
>> > That said, I suspect that actually running Drill inside your process
>> will
>> > lead to a large amount of complexity. Your comment focuses on code
>> issues.
>> > However, there are larger concerns. Although we think of Drill as a
>> simple
>> > single-threaded, single node tool (when run in SqlLine or on a Mac),
>> Drill
>> > is designed to be fully distributed.
>> >
>> > As queries get larger, you will find that Drill itself uses large
>> amounts
>> > of memory and CPU to run a query quickly. (Imagine a join or sort of
>> > billions of rows from several tables.) Drill has its own memory
>> management
>> > system to handle the large blocks of memory needed. Your DB also needs
>> > memory. You'd need a way to unify Drill's memory management with your
>> own
>> > -- a daunting task.
>> >
>> > Grinding through billions of rows is CPU intensive. Drill manages its
>> own
>> > thread and makes very liberal use of CPU. Your DB engine likely also
>> has a
>> > threading model. Again, integrating the two is difficult. We could go
>> on.
>> >
>> > In short, although Drill works well as a query engine on top of a custom
>> > data source; Drill itself is not designed to be a library included into
>> > your app process; it is designed to run as its own distributed set of
>> > processes running alongside your process.
>> >
>> > We could, of course, change the design, but that would be a bit of a big
>> > project because of the above issues. Might be interesting to think how
>> > you'd embed a distributed framework as a library in some host process.
>> Not
>> > sure I've ever seen this done for any tool. (If anyone knows of an
>> example,
>> > please let us know.)
>> >
>> >
>> > I wonder if there is a better solution. Run Drill alongside your DB on
>> the
>> > same nodes. Have Drill then obtain data from your DB via an API. The
>> quick
>> > & dirty solution is to use an RPC API. You can get fancy and use shared
>> > memory. A side benefit is that other tools can also use the API. For
>> > example, if you find you need Spark integration, it is easier to
>> provide.
>> > (You can't, of course, run Spark in your DB process.)
>> >
>> > In this case, an "embedded solution" means that Drill is embedded in
>> your
>> > app cluster (like ZK), not that it is embedded in your app process.
>> >
>> >
>> > In this way, you can tune Drill's memory and CPU usage separately from
>> > that of your engine, making the problem tractable. This model is, in
>> fact,
>> > very similar to the traditional HDFS model in which both Drill and HDFS
>> run
>> > on the same nodes. It is also similar to what MapR did with the MapR DB
>> > integration.
>> >
>> >
>> > Further, by separating the two, you can run Drill on its own nodes if
>> you
>> > find your queries are getting larger and more expensive. That is, you
>> can
>> > scale out be separating compute (Drill) from storage (your DB), allowing
>> > each to scale independently.
>> >
>> >
>> > And, of course, a failure in one engine (Drill or DB) won't take down
>> the
>> > other if the two are in separate processes.
>> >
>> >
>> > In either case, your storage plugin needs to compute data locality. If
>> > your DB is distributed, then perhaps it has some scheme for distributing
>> > data: hash partitioning, range partitioning, or whatever. Somehow, if I
>> > have key 'x', I know to go to node Y to get that value. For example, in
>> > HDFS, Drill can distribute block scans to the node(s) with the blocks.
>> >
>> >
>> > Or, maybe data is randomly distributed, so that every scan must run
>> > against every DB node; in which case if you have N nodes, you'll run N
>> > scans and each will find whatever it happens to contain.
>> >
>> >
>> > If your DB has N nodes, then you need to distribute work to those nodes
>> by
>> > telling Drill that the max parallelization (reported by the group scan)
>> is
>> > N. Then, Drill will ask you for the SubScan for each of the N scans, and
>> > you can allocate work to those nodes. Either by subsetting the scan (as
>> in
>> > HDFS) or just running the same scan everywhere.
>> >
>> >
>> > If you go with the two-process model, then your storage plugin can use
>> > soft affinity: run the scan on the node that has your DB, else run it on
>> > any node and use an RPC to obtain the data. This is how Drill works if
>> it
>> > runs on a subset of HDFS nodes.
>> >
>> > You also asked about the Foreman. At present, Drill assumes nodes are
>> > homogeneous: all nodes evenly share work, including the work of the
>> > Foreman. Impala, for example, has added a feature to dedicate some
>> nodes to
>> > be only coordinators (the equivalent of Drill's Foreman). Drill does not
>> > yet have that feature.
>> >
>> > Without the homogeneity assumption, Drill would need some kind of work
>> > scheduler to know to give less work to the Forman + Drillbit node and
>> more
>> > work to the Drillbit-only nodes. Having Foreman-only nodes would keep
>> > things simpler. In your ase, such a Foreman would have to reside on a
>> node
>> > other than one of your DB nodes to keep the DB nodes symmetrical.
>> >
>> >
>> > The above is a high-level survey of the challenges. We'd be happy to
>> > discuss specific issues as you refine your design.
>> >
>> >
>> > Thanks,
>> > - Paul
>> >
>> >
>> >
>> >     On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
>> > benjamin.schaff@reactivecore.com> wrote:
>> >
>> >  Hi everyone,
>> >
>> > I would like to see if you could provide some recommendations/help
>> around
>> > integrating Apache Drill as a distributed sql engine in a custom
>> database.
>> > Maybe I am going about it the wrong way so any feedback is appreciated.
>> >
>> > What I would like to achieve, is to be able to embed drillbits into my
>> > database node, it's a distributed database written mostly in scala so
>> it's
>> > running inside the jvm. As you would expect, each storage node holds a
>> > partition of the data and I would like for each SubScan to be routed to
>> the
>> > drillbit instance embedded within the database node.
>> >
>> > At this point, drillbits are running communicating properly with zk (I
>> am
>> > using zookeeper for the database also). I can connect to the Plugin I
>> > created using sqlline and I can list schemas and tables. So basically,
>> all
>> > the metadata part is done and working.
>> >
>> > I managed to build-up the patitionwork and affinity using the
>> distributed
>> > metadata off the database and I am stuck in the following situation.
>> >
>> > If I override the "DistributionAffinity getDistributionAffinity()"
>> method
>> > to put it to "HARD", then I end up with having the following error:
>> > "IllegalArgumentException: Sender fragment endpoint list should not be
>> > empty", and the "applyAssignments" method of the GroupScan receives and
>> > empty list of endpoints.
>> >
>> > If I don't override it then node without "local access" get some work
>> > scheduled.
>> >
>> > I was wondering if there was a way to exclude drillbits to become a
>> > foreman.
>> >
>> > Thanks in advance for any guidance.
>> >
>> > --
>> > *This e-mail and any
>> > attachments may contain confidential information and
>> > is intended for use solely
>> > by the addressee(s).  If you are not the
>> >
>> > intended recipient of this e-mail, please be aware that any
>> dissemination,
>> >
>> > distribution, copying, or other use of the e-mail in whole or in part,
>> is
>> >
>> > strictly prohibited.  If you have
>> > received this e-mail in error, please
>> > notify the sender and permanently delete
>> > the original and all copies of the
>> > e-mail, attachments, and any printouts. * **
>>
>

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **

Re: Embedding Drill as a distributed query engine

Posted by Benjamin Schaff <be...@reactivecore.com>.
Hi, thanks everyone for the feedback.

The current database query API support pushdowns (filtering and
projections) but when dealing with billion rows, it's still a lot to move
over the network.
The RPC API itself is not the performance bottleneck, we have our own
binary format similar to flatbuffer with query time codegen readers and
writers so that part is ok.

On the question about why the spark part is kind of slow, I do batch
(usually around 50k rows at a time) but my guess is that going from our
binary format to spark internal row format and then spark moves it to
unsaferow is a lot of transformation for "nothing".
We have a codegen parser that does internal format to spark row format but
directly speaking unsaferow is much more involved so I put it on the side
for now.

Here is what I am going to try from all the feedback you gave me:
1) Since premature optimization is the root of evil, and my spark
assumption might not hold true for Drill, I will try to do a "remote"
integration
2) I will try to see if I can use Drill internal format to ship it on the
network, if anybody could be kind enough to give me a pointer where to look
that would be awesome
3) I will upgrade my current integration to merge the "remote" with the
"local" one

I will keep you guys updated and publish my results so that I can give back
some of my experiments.

On a separate note, I was wondering if/how it was possible for Drill
(probably hacking somewhere in calcite plan, to push down the joins filter
parts or if it is done automatically)

Again, any idea or comment is welcome.

Thanks.

On Wed, Jan 22, 2020 at 1:28 AM Ted Dunning <te...@gmail.com> wrote:

> Hmmm....
>
> I disagree with a lot of what Paul says.
>
> Here is where I agree fully:
>
> 1) collocating processes in the same JVM increases the blast radius of
> failures. If either the DB or the Drill threads go south, it will take the
> other out. This is a relatively low probability event, but increasing the
> probability, or, worse, coupling the probabilities isn't necessary. On a
> very closely related note, the blast radius of GC is also coupled between
> the two processes.
>
> 2) lack of control over either process or memory for either process will
> affect the other. That would be bad. See (1).
>
> 3) coupled scaling is sub-optimal. But that might be compensated for by the
> close coupling of within process communication.
>
> Where I disagree is how serious these considerations are. Drill is fairly
> well disciplined in terms of heap and off-heap space. Presumably the DB is
> as well. That would mean that the likely impact of (2) would be very small.
> The ease of communication between threads within the same process is
> dramatically better than communication between processes, even
> (especially?) with shared memory.
>
> My own recommendation would be to *allow* collocation but not assume it.
> Allow for non-collocated Drill bits as well. That allows you to pivot at
> any point.
>
>
> On the other hand
>
> On Tue, Jan 21, 2020 at 5:10 PM Paul Rogers <pa...@yahoo.com.invalid>
> wrote:
>
> > Hi Benjamin,
> >
> > Very cool project! Drill works well on top of custom data sources.
> >
> > That said, I suspect that actually running Drill inside your process will
> > lead to a large amount of complexity. Your comment focuses on code
> issues.
> > However, there are larger concerns. Although we think of Drill as a
> simple
> > single-threaded, single node tool (when run in SqlLine or on a Mac),
> Drill
> > is designed to be fully distributed.
> >
> > As queries get larger, you will find that Drill itself uses large amounts
> > of memory and CPU to run a query quickly. (Imagine a join or sort of
> > billions of rows from several tables.) Drill has its own memory
> management
> > system to handle the large blocks of memory needed. Your DB also needs
> > memory. You'd need a way to unify Drill's memory management with your own
> > -- a daunting task.
> >
> > Grinding through billions of rows is CPU intensive. Drill manages its own
> > thread and makes very liberal use of CPU. Your DB engine likely also has
> a
> > threading model. Again, integrating the two is difficult. We could go on.
> >
> > In short, although Drill works well as a query engine on top of a custom
> > data source; Drill itself is not designed to be a library included into
> > your app process; it is designed to run as its own distributed set of
> > processes running alongside your process.
> >
> > We could, of course, change the design, but that would be a bit of a big
> > project because of the above issues. Might be interesting to think how
> > you'd embed a distributed framework as a library in some host process.
> Not
> > sure I've ever seen this done for any tool. (If anyone knows of an
> example,
> > please let us know.)
> >
> >
> > I wonder if there is a better solution. Run Drill alongside your DB on
> the
> > same nodes. Have Drill then obtain data from your DB via an API. The
> quick
> > & dirty solution is to use an RPC API. You can get fancy and use shared
> > memory. A side benefit is that other tools can also use the API. For
> > example, if you find you need Spark integration, it is easier to provide.
> > (You can't, of course, run Spark in your DB process.)
> >
> > In this case, an "embedded solution" means that Drill is embedded in your
> > app cluster (like ZK), not that it is embedded in your app process.
> >
> >
> > In this way, you can tune Drill's memory and CPU usage separately from
> > that of your engine, making the problem tractable. This model is, in
> fact,
> > very similar to the traditional HDFS model in which both Drill and HDFS
> run
> > on the same nodes. It is also similar to what MapR did with the MapR DB
> > integration.
> >
> >
> > Further, by separating the two, you can run Drill on its own nodes if you
> > find your queries are getting larger and more expensive. That is, you can
> > scale out be separating compute (Drill) from storage (your DB), allowing
> > each to scale independently.
> >
> >
> > And, of course, a failure in one engine (Drill or DB) won't take down the
> > other if the two are in separate processes.
> >
> >
> > In either case, your storage plugin needs to compute data locality. If
> > your DB is distributed, then perhaps it has some scheme for distributing
> > data: hash partitioning, range partitioning, or whatever. Somehow, if I
> > have key 'x', I know to go to node Y to get that value. For example, in
> > HDFS, Drill can distribute block scans to the node(s) with the blocks.
> >
> >
> > Or, maybe data is randomly distributed, so that every scan must run
> > against every DB node; in which case if you have N nodes, you'll run N
> > scans and each will find whatever it happens to contain.
> >
> >
> > If your DB has N nodes, then you need to distribute work to those nodes
> by
> > telling Drill that the max parallelization (reported by the group scan)
> is
> > N. Then, Drill will ask you for the SubScan for each of the N scans, and
> > you can allocate work to those nodes. Either by subsetting the scan (as
> in
> > HDFS) or just running the same scan everywhere.
> >
> >
> > If you go with the two-process model, then your storage plugin can use
> > soft affinity: run the scan on the node that has your DB, else run it on
> > any node and use an RPC to obtain the data. This is how Drill works if it
> > runs on a subset of HDFS nodes.
> >
> > You also asked about the Foreman. At present, Drill assumes nodes are
> > homogeneous: all nodes evenly share work, including the work of the
> > Foreman. Impala, for example, has added a feature to dedicate some nodes
> to
> > be only coordinators (the equivalent of Drill's Foreman). Drill does not
> > yet have that feature.
> >
> > Without the homogeneity assumption, Drill would need some kind of work
> > scheduler to know to give less work to the Forman + Drillbit node and
> more
> > work to the Drillbit-only nodes. Having Foreman-only nodes would keep
> > things simpler. In your ase, such a Foreman would have to reside on a
> node
> > other than one of your DB nodes to keep the DB nodes symmetrical.
> >
> >
> > The above is a high-level survey of the challenges. We'd be happy to
> > discuss specific issues as you refine your design.
> >
> >
> > Thanks,
> > - Paul
> >
> >
> >
> >     On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
> > benjamin.schaff@reactivecore.com> wrote:
> >
> >  Hi everyone,
> >
> > I would like to see if you could provide some recommendations/help around
> > integrating Apache Drill as a distributed sql engine in a custom
> database.
> > Maybe I am going about it the wrong way so any feedback is appreciated.
> >
> > What I would like to achieve, is to be able to embed drillbits into my
> > database node, it's a distributed database written mostly in scala so
> it's
> > running inside the jvm. As you would expect, each storage node holds a
> > partition of the data and I would like for each SubScan to be routed to
> the
> > drillbit instance embedded within the database node.
> >
> > At this point, drillbits are running communicating properly with zk (I am
> > using zookeeper for the database also). I can connect to the Plugin I
> > created using sqlline and I can list schemas and tables. So basically,
> all
> > the metadata part is done and working.
> >
> > I managed to build-up the patitionwork and affinity using the distributed
> > metadata off the database and I am stuck in the following situation.
> >
> > If I override the "DistributionAffinity getDistributionAffinity()" method
> > to put it to "HARD", then I end up with having the following error:
> > "IllegalArgumentException: Sender fragment endpoint list should not be
> > empty", and the "applyAssignments" method of the GroupScan receives and
> > empty list of endpoints.
> >
> > If I don't override it then node without "local access" get some work
> > scheduled.
> >
> > I was wondering if there was a way to exclude drillbits to become a
> > foreman.
> >
> > Thanks in advance for any guidance.
> >
> > --
> > *This e-mail and any
> > attachments may contain confidential information and
> > is intended for use solely
> > by the addressee(s).  If you are not the
> >
> > intended recipient of this e-mail, please be aware that any
> dissemination,
> >
> > distribution, copying, or other use of the e-mail in whole or in part, is
> >
> > strictly prohibited.  If you have
> > received this e-mail in error, please
> > notify the sender and permanently delete
> > the original and all copies of the
> > e-mail, attachments, and any printouts. * **
>

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **

Re: Embedding Drill as a distributed query engine

Posted by Ted Dunning <te...@gmail.com>.
Hmmm....

I disagree with a lot of what Paul says.

Here is where I agree fully:

1) collocating processes in the same JVM increases the blast radius of
failures. If either the DB or the Drill threads go south, it will take the
other out. This is a relatively low probability event, but increasing the
probability, or, worse, coupling the probabilities isn't necessary. On a
very closely related note, the blast radius of GC is also coupled between
the two processes.

2) lack of control over either process or memory for either process will
affect the other. That would be bad. See (1).

3) coupled scaling is sub-optimal. But that might be compensated for by the
close coupling of within process communication.

Where I disagree is how serious these considerations are. Drill is fairly
well disciplined in terms of heap and off-heap space. Presumably the DB is
as well. That would mean that the likely impact of (2) would be very small.
The ease of communication between threads within the same process is
dramatically better than communication between processes, even
(especially?) with shared memory.

My own recommendation would be to *allow* collocation but not assume it.
Allow for non-collocated Drill bits as well. That allows you to pivot at
any point.


On the other hand

On Tue, Jan 21, 2020 at 5:10 PM Paul Rogers <pa...@yahoo.com.invalid>
wrote:

> Hi Benjamin,
>
> Very cool project! Drill works well on top of custom data sources.
>
> That said, I suspect that actually running Drill inside your process will
> lead to a large amount of complexity. Your comment focuses on code issues.
> However, there are larger concerns. Although we think of Drill as a simple
> single-threaded, single node tool (when run in SqlLine or on a Mac), Drill
> is designed to be fully distributed.
>
> As queries get larger, you will find that Drill itself uses large amounts
> of memory and CPU to run a query quickly. (Imagine a join or sort of
> billions of rows from several tables.) Drill has its own memory management
> system to handle the large blocks of memory needed. Your DB also needs
> memory. You'd need a way to unify Drill's memory management with your own
> -- a daunting task.
>
> Grinding through billions of rows is CPU intensive. Drill manages its own
> thread and makes very liberal use of CPU. Your DB engine likely also has a
> threading model. Again, integrating the two is difficult. We could go on.
>
> In short, although Drill works well as a query engine on top of a custom
> data source; Drill itself is not designed to be a library included into
> your app process; it is designed to run as its own distributed set of
> processes running alongside your process.
>
> We could, of course, change the design, but that would be a bit of a big
> project because of the above issues. Might be interesting to think how
> you'd embed a distributed framework as a library in some host process. Not
> sure I've ever seen this done for any tool. (If anyone knows of an example,
> please let us know.)
>
>
> I wonder if there is a better solution. Run Drill alongside your DB on the
> same nodes. Have Drill then obtain data from your DB via an API. The quick
> & dirty solution is to use an RPC API. You can get fancy and use shared
> memory. A side benefit is that other tools can also use the API. For
> example, if you find you need Spark integration, it is easier to provide.
> (You can't, of course, run Spark in your DB process.)
>
> In this case, an "embedded solution" means that Drill is embedded in your
> app cluster (like ZK), not that it is embedded in your app process.
>
>
> In this way, you can tune Drill's memory and CPU usage separately from
> that of your engine, making the problem tractable. This model is, in fact,
> very similar to the traditional HDFS model in which both Drill and HDFS run
> on the same nodes. It is also similar to what MapR did with the MapR DB
> integration.
>
>
> Further, by separating the two, you can run Drill on its own nodes if you
> find your queries are getting larger and more expensive. That is, you can
> scale out be separating compute (Drill) from storage (your DB), allowing
> each to scale independently.
>
>
> And, of course, a failure in one engine (Drill or DB) won't take down the
> other if the two are in separate processes.
>
>
> In either case, your storage plugin needs to compute data locality. If
> your DB is distributed, then perhaps it has some scheme for distributing
> data: hash partitioning, range partitioning, or whatever. Somehow, if I
> have key 'x', I know to go to node Y to get that value. For example, in
> HDFS, Drill can distribute block scans to the node(s) with the blocks.
>
>
> Or, maybe data is randomly distributed, so that every scan must run
> against every DB node; in which case if you have N nodes, you'll run N
> scans and each will find whatever it happens to contain.
>
>
> If your DB has N nodes, then you need to distribute work to those nodes by
> telling Drill that the max parallelization (reported by the group scan) is
> N. Then, Drill will ask you for the SubScan for each of the N scans, and
> you can allocate work to those nodes. Either by subsetting the scan (as in
> HDFS) or just running the same scan everywhere.
>
>
> If you go with the two-process model, then your storage plugin can use
> soft affinity: run the scan on the node that has your DB, else run it on
> any node and use an RPC to obtain the data. This is how Drill works if it
> runs on a subset of HDFS nodes.
>
> You also asked about the Foreman. At present, Drill assumes nodes are
> homogeneous: all nodes evenly share work, including the work of the
> Foreman. Impala, for example, has added a feature to dedicate some nodes to
> be only coordinators (the equivalent of Drill's Foreman). Drill does not
> yet have that feature.
>
> Without the homogeneity assumption, Drill would need some kind of work
> scheduler to know to give less work to the Forman + Drillbit node and more
> work to the Drillbit-only nodes. Having Foreman-only nodes would keep
> things simpler. In your ase, such a Foreman would have to reside on a node
> other than one of your DB nodes to keep the DB nodes symmetrical.
>
>
> The above is a high-level survey of the challenges. We'd be happy to
> discuss specific issues as you refine your design.
>
>
> Thanks,
> - Paul
>
>
>
>     On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
> benjamin.schaff@reactivecore.com> wrote:
>
>  Hi everyone,
>
> I would like to see if you could provide some recommendations/help around
> integrating Apache Drill as a distributed sql engine in a custom database.
> Maybe I am going about it the wrong way so any feedback is appreciated.
>
> What I would like to achieve, is to be able to embed drillbits into my
> database node, it's a distributed database written mostly in scala so it's
> running inside the jvm. As you would expect, each storage node holds a
> partition of the data and I would like for each SubScan to be routed to the
> drillbit instance embedded within the database node.
>
> At this point, drillbits are running communicating properly with zk (I am
> using zookeeper for the database also). I can connect to the Plugin I
> created using sqlline and I can list schemas and tables. So basically, all
> the metadata part is done and working.
>
> I managed to build-up the patitionwork and affinity using the distributed
> metadata off the database and I am stuck in the following situation.
>
> If I override the "DistributionAffinity getDistributionAffinity()" method
> to put it to "HARD", then I end up with having the following error:
> "IllegalArgumentException: Sender fragment endpoint list should not be
> empty", and the "applyAssignments" method of the GroupScan receives and
> empty list of endpoints.
>
> If I don't override it then node without "local access" get some work
> scheduled.
>
> I was wondering if there was a way to exclude drillbits to become a
> foreman.
>
> Thanks in advance for any guidance.
>
> --
> *This e-mail and any
> attachments may contain confidential information and
> is intended for use solely
> by the addressee(s).  If you are not the
>
> intended recipient of this e-mail, please be aware that any dissemination,
>
> distribution, copying, or other use of the e-mail in whole or in part, is
>
> strictly prohibited.  If you have
> received this e-mail in error, please
> notify the sender and permanently delete
> the original and all copies of the
> e-mail, attachments, and any printouts. * **

Re: Embedding Drill as a distributed query engine

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Hi Benjamin,

Very cool project! Drill works well on top of custom data sources.

That said, I suspect that actually running Drill inside your process will lead to a large amount of complexity. Your comment focuses on code issues. However, there are larger concerns. Although we think of Drill as a simple single-threaded, single node tool (when run in SqlLine or on a Mac), Drill is designed to be fully distributed.

As queries get larger, you will find that Drill itself uses large amounts of memory and CPU to run a query quickly. (Imagine a join or sort of billions of rows from several tables.) Drill has its own memory management system to handle the large blocks of memory needed. Your DB also needs memory. You'd need a way to unify Drill's memory management with your own -- a daunting task.

Grinding through billions of rows is CPU intensive. Drill manages its own thread and makes very liberal use of CPU. Your DB engine likely also has a threading model. Again, integrating the two is difficult. We could go on.

In short, although Drill works well as a query engine on top of a custom data source; Drill itself is not designed to be a library included into your app process; it is designed to run as its own distributed set of processes running alongside your process.

We could, of course, change the design, but that would be a bit of a big project because of the above issues. Might be interesting to think how you'd embed a distributed framework as a library in some host process. Not sure I've ever seen this done for any tool. (If anyone knows of an example, please let us know.)


I wonder if there is a better solution. Run Drill alongside your DB on the same nodes. Have Drill then obtain data from your DB via an API. The quick & dirty solution is to use an RPC API. You can get fancy and use shared memory. A side benefit is that other tools can also use the API. For example, if you find you need Spark integration, it is easier to provide. (You can't, of course, run Spark in your DB process.)

In this case, an "embedded solution" means that Drill is embedded in your app cluster (like ZK), not that it is embedded in your app process.


In this way, you can tune Drill's memory and CPU usage separately from that of your engine, making the problem tractable. This model is, in fact, very similar to the traditional HDFS model in which both Drill and HDFS run on the same nodes. It is also similar to what MapR did with the MapR DB integration.


Further, by separating the two, you can run Drill on its own nodes if you find your queries are getting larger and more expensive. That is, you can scale out be separating compute (Drill) from storage (your DB), allowing each to scale independently.


And, of course, a failure in one engine (Drill or DB) won't take down the other if the two are in separate processes.


In either case, your storage plugin needs to compute data locality. If your DB is distributed, then perhaps it has some scheme for distributing data: hash partitioning, range partitioning, or whatever. Somehow, if I have key 'x', I know to go to node Y to get that value. For example, in HDFS, Drill can distribute block scans to the node(s) with the blocks.


Or, maybe data is randomly distributed, so that every scan must run against every DB node; in which case if you have N nodes, you'll run N scans and each will find whatever it happens to contain.


If your DB has N nodes, then you need to distribute work to those nodes by telling Drill that the max parallelization (reported by the group scan) is N. Then, Drill will ask you for the SubScan for each of the N scans, and you can allocate work to those nodes. Either by subsetting the scan (as in HDFS) or just running the same scan everywhere.


If you go with the two-process model, then your storage plugin can use soft affinity: run the scan on the node that has your DB, else run it on any node and use an RPC to obtain the data. This is how Drill works if it runs on a subset of HDFS nodes.

You also asked about the Foreman. At present, Drill assumes nodes are homogeneous: all nodes evenly share work, including the work of the Foreman. Impala, for example, has added a feature to dedicate some nodes to be only coordinators (the equivalent of Drill's Foreman). Drill does not yet have that feature.

Without the homogeneity assumption, Drill would need some kind of work scheduler to know to give less work to the Forman + Drillbit node and more work to the Drillbit-only nodes. Having Foreman-only nodes would keep things simpler. In your ase, such a Foreman would have to reside on a node other than one of your DB nodes to keep the DB nodes symmetrical.


The above is a high-level survey of the challenges. We'd be happy to discuss specific issues as you refine your design.


Thanks,
- Paul

 

    On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <be...@reactivecore.com> wrote:  
 
 Hi everyone,

I would like to see if you could provide some recommendations/help around
integrating Apache Drill as a distributed sql engine in a custom database.
Maybe I am going about it the wrong way so any feedback is appreciated.

What I would like to achieve, is to be able to embed drillbits into my
database node, it's a distributed database written mostly in scala so it's
running inside the jvm. As you would expect, each storage node holds a
partition of the data and I would like for each SubScan to be routed to the
drillbit instance embedded within the database node.

At this point, drillbits are running communicating properly with zk (I am
using zookeeper for the database also). I can connect to the Plugin I
created using sqlline and I can list schemas and tables. So basically, all
the metadata part is done and working.

I managed to build-up the patitionwork and affinity using the distributed
metadata off the database and I am stuck in the following situation.

If I override the "DistributionAffinity getDistributionAffinity()" method
to put it to "HARD", then I end up with having the following error:
"IllegalArgumentException: Sender fragment endpoint list should not be
empty", and the "applyAssignments" method of the GroupScan receives and
empty list of endpoints.

If I don't override it then node without "local access" get some work
scheduled.

I was wondering if there was a way to exclude drillbits to become a foreman.

Thanks in advance for any guidance.

-- 
*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **