You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Frank Hughes <fr...@gmail.com> on 2017/03/15 10:38:56 UTC

TransportException - Consistency LOCAL_ONE - EC2

Hi there,

Im running a java process on a 4 node cassandra 3.9 cluster on EC2 (instance type t2.2xlarge), the process running separately on each of the nodes (i.e. 4 running JVMs).
The process is just doing reads from Cassandra and building a SOLR index and using the java driver with consistency level LOCAL_ONE.
However, the following exception is through:

com.datastax.driver.core.exceptions.TransportException: [/10.0.0.2:9042] Connection has been closed
        at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:38)
        at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:24)
        at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
        at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
        at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.isExhausted(ArrayBackedResultSet.java:269)
        at com.datastax.driver.core.ArrayBackedResultSet$1.hasNext(ArrayBackedResultSet.java:143)

where 10.0.0.2 is not the local machine. So my questions:

- Should this happen when Im using consistency level LOCAL_ONE and just doing reads ?
- Does this suggest non-local reads are happening ?

Many thanks for any help/ideas.

Frank



Re: TransportException - Consistency LOCAL_ONE - EC2

Posted by Ryan Svihla <rs...@foundev.pro>.
give it a try see how it behaves

On Mar 15, 2017 10:09 AM, "Frank Hughes" <fr...@gmail.com> wrote:

> Thanks Ryan, appreciated again. getPolicy just had this:
>
> Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.
> builder().build());
>
> so i guess i need
>
> Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build(),
> false);
>
> Frank
>
> On 2017-03-15 13:45 (-0000), Ryan Svihla <rs...@foundev.pro> wrote:
> > I don't see what getPolicy is retrieving but you want to use TokenAware
> > with the shuffle false option in the ctor, it defaults to shuffle true so
> > that load is spread when people have horribly fat partitions.
> >
> > On Wed, Mar 15, 2017 at 9:41 AM, Frank Hughes <fr...@gmail.com>
> > wrote:
> >
> > > Thanks for reply. Much appreciated.
> > >
> > > I should have included more detail. So I am using replication factor 2,
> > > and the code is using a token aware method of distributing the work so
> that
> > > only data that is primarily owned by the node is read on that local
> > > machine. So i guess this points to the logic im using to determine
> what is
> > > primarily owned by a node. I guess this is verging into something that
> > > should be posted to the java driver list, but i'll post here in case
> its
> > > useful or theres an obvious problem:
> > >
> > > PoolingOptions poolingOpts = new PoolingOptions();
> > > poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE,
> this.coreConn);
> > > poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE,
> this.maxConn);
> > > poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
> > > poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
> > >
> > > SocketOptions socketOptions = new SocketOptions();
> > > socketOptions.setReadTimeoutMillis(15000);
> > >
> > > Cluster.Builder builder = Cluster.builder();
> > > for(String contactPoint: contactPoints){
> > >     builder.addContactPoint(contactPoint.trim());
> > >     builder.withPoolingOptions(poolingOpts);
> > >     builder.withSocketOptions(socketOptions);
> > > }
> > >
> > > builder.withLoadBalancingPolicy(getPolicy())
> > >         .withQueryOptions(new QueryOptions()
> > >                 .setPrepareOnAllHosts(true)
> > >                 .setMetadataEnabled(true)
> > >         );
> > >
> > > Cluster cluster = builder.build();
> > > Metadata metadata = cluster.getMetadata();
> > > Session session = cluster.connect(keyspaceName);
> > > Set<Host> allHosts = metadata.getAllHosts();
> > > int numberOfHost = 4;
> > >
> > > Host localHost = null;
> > > for (Host host : allHosts) {
> > >     if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
> > >         localHost = host;
> > > }
> > >
> > > Map<Host, List<TokenRange>> replicaCount = new HashMap<Host,
> > > List<TokenRange>>();
> > > TokenRange[] tokenRanges = unwrapTokenRanges(metadata.
> getTokenRanges()).toArray(new
> > > TokenRange[0]);
> > >
> > > List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
> > > tokenRangeList.sort(new Comparator<TokenRange>() {
> > >     @Override
> > >     public int compare(TokenRange o1, TokenRange o2) {
> > >         return o1.getStart().compareTo(o2.getStart());
> > >     }
> > > });
> > >
> > > int numberOfHost = metadata.getAllHosts().size();
> > > int rangesPerHost = tokenRanges.length / numberOfHost;
> > >
> > > for(TokenRange tokenRange : tokenRangeList){
> > >
> > >     Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);
> > >
> > >     String rangeHosts = "";
> > >     Iterator<Host> iter = hosts.iterator();
> > >     while(iter.hasNext()){
> > >         Host host = iter.next();
> > >
> > >         List<TokenRange> tokenRangesForHost = replicaCount.get(host);
> > >         if(tokenRangesForHost == null){
> > >             tokenRangesForHost = new ArrayList<TokenRange>();
> > >         }
> > >
> > >         if(tokenRangesForHost.size() < rangesPerHost ||
> !iter.hasNext()){
> > >             tokenRangesForHost.add(tokenRange);
> > >             replicaCount.put(host, tokenRangesForHost);
> > >             break;
> > >         }
> > >
> > >         rangeHosts += host.getAddress().toString();
> > >     }
> > > }
> > >
> > > for(Host replica : replicaCount.keySet()){
> > >     List<TokenRange> allocatedRanges = replicaCount.get(replica);
> > >     for(TokenRange tr : replicaCount.get(replica)){
> > >         System.out.println(tr.getStart() + " to " + tr.getEnd());
> > >     }
> > > }
> > >
> > > //get a list of token ranges for this host
> > > List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);
> > >
> > > Again, any thoughts are much appreciated.
> > >
> > > Thanks
> > >
> > > Frank
> > >
> > >
> > > On 2017-03-15 12:38 (-0000), Ryan Svihla <rs...@foundev.pro> wrote:
> > > > LOCAL_ONE just means local to the datacenter by default the
> tokenaware
> > > > policy will go to a replica that owns that data (primary or any
> replica
> > > > depends on the driver) and that may or may not be the node the driver
> > > > process is running on.
> > > >
> > > > So to put this more concretely if you have RF 2 with that 4 node
> cluster
> > > so
> > > > 2 nodes will be responsible for that data and if your local process
> is
> > > not
> > > > running on one of those 2 nodes it will definitely HAVE to go to
> another
> > > > node.
> > > >
> > > > Therefore, if you wanted to pin behavior to a local replica you'd
> have to
> > > > send your work out in a token aware fashion where said work only
> goes to
> > > > the primary token owner of that data, and remove any shuffling of
> > > replicas
> > > > in the process (is only on by default in the java driver to my
> > > knowledge).
> > > >
> > > > On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <
> frankhughes782@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi there,
> > > > >
> > > > > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > > > > (instance type t2.2xlarge), the process running separately on each
> of
> > > the
> > > > > nodes (i.e. 4 running JVMs).
> > > > > The process is just doing reads from Cassandra and building a SOLR
> > > index
> > > > > and using the java driver with consistency level LOCAL_ONE.
> > > > > However, the following exception is through:
> > > > >
> > > > > com.datastax.driver.core.exceptions.TransportException: [/
> > > 10.0.0.2:9042]
> > > > > Connection has been closed
> > > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > > copy(TransportException.java:38)
> > > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > > copy(TransportException.java:24)
> > > > >         at com.datastax.driver.core.DriverThrowables.
> propagateCause(
> > > > > DriverThrowables.java:37)
> > > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> > > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> > > > >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > > > > hasNext(ArrayBackedResultSet.java:143)
> > > > >
> > > > > where 10.0.0.2 is not the local machine. So my questions:
> > > > >
> > > > > - Should this happen when Im using consistency level LOCAL_ONE and
> just
> > > > > doing reads ?
> > > > > - Does this suggest non-local reads are happening ?
> > > > >
> > > > > Many thanks for any help/ideas.
> > > > >
> > > > > Frank
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Thanks,
> > > > Ryan Svihla
> > > >
> > >
> >
> >
> >
> > --
> >
> > Thanks,
> > Ryan Svihla
> >
>

Re: TransportException - Consistency LOCAL_ONE - EC2

Posted by Frank Hughes <fr...@gmail.com>.
Thanks Ryan, appreciated again. getPolicy just had this:

Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build());

so i guess i need 

Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build(), false);

Frank

On 2017-03-15 13:45 (-0000), Ryan Svihla <rs...@foundev.pro> wrote: 
> I don't see what getPolicy is retrieving but you want to use TokenAware
> with the shuffle false option in the ctor, it defaults to shuffle true so
> that load is spread when people have horribly fat partitions.
> 
> On Wed, Mar 15, 2017 at 9:41 AM, Frank Hughes <fr...@gmail.com>
> wrote:
> 
> > Thanks for reply. Much appreciated.
> >
> > I should have included more detail. So I am using replication factor 2,
> > and the code is using a token aware method of distributing the work so that
> > only data that is primarily owned by the node is read on that local
> > machine. So i guess this points to the logic im using to determine what is
> > primarily owned by a node. I guess this is verging into something that
> > should be posted to the java driver list, but i'll post here in case its
> > useful or theres an obvious problem:
> >
> > PoolingOptions poolingOpts = new PoolingOptions();
> > poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, this.coreConn);
> > poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, this.maxConn);
> > poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
> > poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
> >
> > SocketOptions socketOptions = new SocketOptions();
> > socketOptions.setReadTimeoutMillis(15000);
> >
> > Cluster.Builder builder = Cluster.builder();
> > for(String contactPoint: contactPoints){
> >     builder.addContactPoint(contactPoint.trim());
> >     builder.withPoolingOptions(poolingOpts);
> >     builder.withSocketOptions(socketOptions);
> > }
> >
> > builder.withLoadBalancingPolicy(getPolicy())
> >         .withQueryOptions(new QueryOptions()
> >                 .setPrepareOnAllHosts(true)
> >                 .setMetadataEnabled(true)
> >         );
> >
> > Cluster cluster = builder.build();
> > Metadata metadata = cluster.getMetadata();
> > Session session = cluster.connect(keyspaceName);
> > Set<Host> allHosts = metadata.getAllHosts();
> > int numberOfHost = 4;
> >
> > Host localHost = null;
> > for (Host host : allHosts) {
> >     if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
> >         localHost = host;
> > }
> >
> > Map<Host, List<TokenRange>> replicaCount = new HashMap<Host,
> > List<TokenRange>>();
> > TokenRange[] tokenRanges = unwrapTokenRanges(metadata.getTokenRanges()).toArray(new
> > TokenRange[0]);
> >
> > List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
> > tokenRangeList.sort(new Comparator<TokenRange>() {
> >     @Override
> >     public int compare(TokenRange o1, TokenRange o2) {
> >         return o1.getStart().compareTo(o2.getStart());
> >     }
> > });
> >
> > int numberOfHost = metadata.getAllHosts().size();
> > int rangesPerHost = tokenRanges.length / numberOfHost;
> >
> > for(TokenRange tokenRange : tokenRangeList){
> >
> >     Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);
> >
> >     String rangeHosts = "";
> >     Iterator<Host> iter = hosts.iterator();
> >     while(iter.hasNext()){
> >         Host host = iter.next();
> >
> >         List<TokenRange> tokenRangesForHost = replicaCount.get(host);
> >         if(tokenRangesForHost == null){
> >             tokenRangesForHost = new ArrayList<TokenRange>();
> >         }
> >
> >         if(tokenRangesForHost.size() < rangesPerHost || !iter.hasNext()){
> >             tokenRangesForHost.add(tokenRange);
> >             replicaCount.put(host, tokenRangesForHost);
> >             break;
> >         }
> >
> >         rangeHosts += host.getAddress().toString();
> >     }
> > }
> >
> > for(Host replica : replicaCount.keySet()){
> >     List<TokenRange> allocatedRanges = replicaCount.get(replica);
> >     for(TokenRange tr : replicaCount.get(replica)){
> >         System.out.println(tr.getStart() + " to " + tr.getEnd());
> >     }
> > }
> >
> > //get a list of token ranges for this host
> > List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);
> >
> > Again, any thoughts are much appreciated.
> >
> > Thanks
> >
> > Frank
> >
> >
> > On 2017-03-15 12:38 (-0000), Ryan Svihla <rs...@foundev.pro> wrote:
> > > LOCAL_ONE just means local to the datacenter by default the tokenaware
> > > policy will go to a replica that owns that data (primary or any replica
> > > depends on the driver) and that may or may not be the node the driver
> > > process is running on.
> > >
> > > So to put this more concretely if you have RF 2 with that 4 node cluster
> > so
> > > 2 nodes will be responsible for that data and if your local process is
> > not
> > > running on one of those 2 nodes it will definitely HAVE to go to another
> > > node.
> > >
> > > Therefore, if you wanted to pin behavior to a local replica you'd have to
> > > send your work out in a token aware fashion where said work only goes to
> > > the primary token owner of that data, and remove any shuffling of
> > replicas
> > > in the process (is only on by default in the java driver to my
> > knowledge).
> > >
> > > On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <fr...@gmail.com>
> > > wrote:
> > >
> > > > Hi there,
> > > >
> > > > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > > > (instance type t2.2xlarge), the process running separately on each of
> > the
> > > > nodes (i.e. 4 running JVMs).
> > > > The process is just doing reads from Cassandra and building a SOLR
> > index
> > > > and using the java driver with consistency level LOCAL_ONE.
> > > > However, the following exception is through:
> > > >
> > > > com.datastax.driver.core.exceptions.TransportException: [/
> > 10.0.0.2:9042]
> > > > Connection has been closed
> > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > copy(TransportException.java:38)
> > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > copy(TransportException.java:24)
> > > >         at com.datastax.driver.core.DriverThrowables.propagateCause(
> > > > DriverThrowables.java:37)
> > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> > > >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > > > hasNext(ArrayBackedResultSet.java:143)
> > > >
> > > > where 10.0.0.2 is not the local machine. So my questions:
> > > >
> > > > - Should this happen when Im using consistency level LOCAL_ONE and just
> > > > doing reads ?
> > > > - Does this suggest non-local reads are happening ?
> > > >
> > > > Many thanks for any help/ideas.
> > > >
> > > > Frank
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Thanks,
> > > Ryan Svihla
> > >
> >
> 
> 
> 
> -- 
> 
> Thanks,
> Ryan Svihla
> 

Re: TransportException - Consistency LOCAL_ONE - EC2

Posted by Ryan Svihla <rs...@foundev.pro>.
I don't see what getPolicy is retrieving but you want to use TokenAware
with the shuffle false option in the ctor, it defaults to shuffle true so
that load is spread when people have horribly fat partitions.

On Wed, Mar 15, 2017 at 9:41 AM, Frank Hughes <fr...@gmail.com>
wrote:

> Thanks for reply. Much appreciated.
>
> I should have included more detail. So I am using replication factor 2,
> and the code is using a token aware method of distributing the work so that
> only data that is primarily owned by the node is read on that local
> machine. So i guess this points to the logic im using to determine what is
> primarily owned by a node. I guess this is verging into something that
> should be posted to the java driver list, but i'll post here in case its
> useful or theres an obvious problem:
>
> PoolingOptions poolingOpts = new PoolingOptions();
> poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, this.coreConn);
> poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, this.maxConn);
> poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
> poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
>
> SocketOptions socketOptions = new SocketOptions();
> socketOptions.setReadTimeoutMillis(15000);
>
> Cluster.Builder builder = Cluster.builder();
> for(String contactPoint: contactPoints){
>     builder.addContactPoint(contactPoint.trim());
>     builder.withPoolingOptions(poolingOpts);
>     builder.withSocketOptions(socketOptions);
> }
>
> builder.withLoadBalancingPolicy(getPolicy())
>         .withQueryOptions(new QueryOptions()
>                 .setPrepareOnAllHosts(true)
>                 .setMetadataEnabled(true)
>         );
>
> Cluster cluster = builder.build();
> Metadata metadata = cluster.getMetadata();
> Session session = cluster.connect(keyspaceName);
> Set<Host> allHosts = metadata.getAllHosts();
> int numberOfHost = 4;
>
> Host localHost = null;
> for (Host host : allHosts) {
>     if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
>         localHost = host;
> }
>
> Map<Host, List<TokenRange>> replicaCount = new HashMap<Host,
> List<TokenRange>>();
> TokenRange[] tokenRanges = unwrapTokenRanges(metadata.getTokenRanges()).toArray(new
> TokenRange[0]);
>
> List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
> tokenRangeList.sort(new Comparator<TokenRange>() {
>     @Override
>     public int compare(TokenRange o1, TokenRange o2) {
>         return o1.getStart().compareTo(o2.getStart());
>     }
> });
>
> int numberOfHost = metadata.getAllHosts().size();
> int rangesPerHost = tokenRanges.length / numberOfHost;
>
> for(TokenRange tokenRange : tokenRangeList){
>
>     Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);
>
>     String rangeHosts = "";
>     Iterator<Host> iter = hosts.iterator();
>     while(iter.hasNext()){
>         Host host = iter.next();
>
>         List<TokenRange> tokenRangesForHost = replicaCount.get(host);
>         if(tokenRangesForHost == null){
>             tokenRangesForHost = new ArrayList<TokenRange>();
>         }
>
>         if(tokenRangesForHost.size() < rangesPerHost || !iter.hasNext()){
>             tokenRangesForHost.add(tokenRange);
>             replicaCount.put(host, tokenRangesForHost);
>             break;
>         }
>
>         rangeHosts += host.getAddress().toString();
>     }
> }
>
> for(Host replica : replicaCount.keySet()){
>     List<TokenRange> allocatedRanges = replicaCount.get(replica);
>     for(TokenRange tr : replicaCount.get(replica)){
>         System.out.println(tr.getStart() + " to " + tr.getEnd());
>     }
> }
>
> //get a list of token ranges for this host
> List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);
>
> Again, any thoughts are much appreciated.
>
> Thanks
>
> Frank
>
>
> On 2017-03-15 12:38 (-0000), Ryan Svihla <rs...@foundev.pro> wrote:
> > LOCAL_ONE just means local to the datacenter by default the tokenaware
> > policy will go to a replica that owns that data (primary or any replica
> > depends on the driver) and that may or may not be the node the driver
> > process is running on.
> >
> > So to put this more concretely if you have RF 2 with that 4 node cluster
> so
> > 2 nodes will be responsible for that data and if your local process is
> not
> > running on one of those 2 nodes it will definitely HAVE to go to another
> > node.
> >
> > Therefore, if you wanted to pin behavior to a local replica you'd have to
> > send your work out in a token aware fashion where said work only goes to
> > the primary token owner of that data, and remove any shuffling of
> replicas
> > in the process (is only on by default in the java driver to my
> knowledge).
> >
> > On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <fr...@gmail.com>
> > wrote:
> >
> > > Hi there,
> > >
> > > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > > (instance type t2.2xlarge), the process running separately on each of
> the
> > > nodes (i.e. 4 running JVMs).
> > > The process is just doing reads from Cassandra and building a SOLR
> index
> > > and using the java driver with consistency level LOCAL_ONE.
> > > However, the following exception is through:
> > >
> > > com.datastax.driver.core.exceptions.TransportException: [/
> 10.0.0.2:9042]
> > > Connection has been closed
> > >         at com.datastax.driver.core.exceptions.TransportException.
> > > copy(TransportException.java:38)
> > >         at com.datastax.driver.core.exceptions.TransportException.
> > > copy(TransportException.java:24)
> > >         at com.datastax.driver.core.DriverThrowables.propagateCause(
> > > DriverThrowables.java:37)
> > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> > >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > > hasNext(ArrayBackedResultSet.java:143)
> > >
> > > where 10.0.0.2 is not the local machine. So my questions:
> > >
> > > - Should this happen when Im using consistency level LOCAL_ONE and just
> > > doing reads ?
> > > - Does this suggest non-local reads are happening ?
> > >
> > > Many thanks for any help/ideas.
> > >
> > > Frank
> > >
> > >
> > >
> >
> >
> > --
> >
> > Thanks,
> > Ryan Svihla
> >
>



-- 

Thanks,
Ryan Svihla

Re: TransportException - Consistency LOCAL_ONE - EC2

Posted by Frank Hughes <fr...@gmail.com>.
Thanks for reply. Much appreciated.

I should have included more detail. So I am using replication factor 2, and the code is using a token aware method of distributing the work so that only data that is primarily owned by the node is read on that local machine. So i guess this points to the logic im using to determine what is primarily owned by a node. I guess this is verging into something that should be posted to the java driver list, but i'll post here in case its useful or theres an obvious problem:

PoolingOptions poolingOpts = new PoolingOptions();
poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, this.coreConn);
poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, this.maxConn);
poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);

SocketOptions socketOptions = new SocketOptions();
socketOptions.setReadTimeoutMillis(15000);

Cluster.Builder builder = Cluster.builder();
for(String contactPoint: contactPoints){
    builder.addContactPoint(contactPoint.trim());
    builder.withPoolingOptions(poolingOpts);
    builder.withSocketOptions(socketOptions);
}

builder.withLoadBalancingPolicy(getPolicy())
        .withQueryOptions(new QueryOptions()
                .setPrepareOnAllHosts(true)
                .setMetadataEnabled(true)
        );

Cluster cluster = builder.build();       
Metadata metadata = cluster.getMetadata();
Session session = cluster.connect(keyspaceName);
Set<Host> allHosts = metadata.getAllHosts();
int numberOfHost = 4;

Host localHost = null;
for (Host host : allHosts) {
    if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
        localHost = host;
}

Map<Host, List<TokenRange>> replicaCount = new HashMap<Host, List<TokenRange>>();
TokenRange[] tokenRanges = unwrapTokenRanges(metadata.getTokenRanges()).toArray(new TokenRange[0]);

List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
tokenRangeList.sort(new Comparator<TokenRange>() {
    @Override
    public int compare(TokenRange o1, TokenRange o2) {
        return o1.getStart().compareTo(o2.getStart());
    }
});

int numberOfHost = metadata.getAllHosts().size();
int rangesPerHost = tokenRanges.length / numberOfHost;

for(TokenRange tokenRange : tokenRangeList){

    Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);

    String rangeHosts = "";
    Iterator<Host> iter = hosts.iterator();
    while(iter.hasNext()){
        Host host = iter.next();

        List<TokenRange> tokenRangesForHost = replicaCount.get(host);
        if(tokenRangesForHost == null){
            tokenRangesForHost = new ArrayList<TokenRange>();
        }

        if(tokenRangesForHost.size() < rangesPerHost || !iter.hasNext()){
            tokenRangesForHost.add(tokenRange);
            replicaCount.put(host, tokenRangesForHost);
            break;
        }

        rangeHosts += host.getAddress().toString();
    }
}

for(Host replica : replicaCount.keySet()){
    List<TokenRange> allocatedRanges = replicaCount.get(replica);
    for(TokenRange tr : replicaCount.get(replica)){
        System.out.println(tr.getStart() + " to " + tr.getEnd());
    }
}

//get a list of token ranges for this host
List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);

Again, any thoughts are much appreciated.

Thanks

Frank


On 2017-03-15 12:38 (-0000), Ryan Svihla <rs...@foundev.pro> wrote: 
> LOCAL_ONE just means local to the datacenter by default the tokenaware
> policy will go to a replica that owns that data (primary or any replica
> depends on the driver) and that may or may not be the node the driver
> process is running on.
> 
> So to put this more concretely if you have RF 2 with that 4 node cluster so
> 2 nodes will be responsible for that data and if your local process is not
> running on one of those 2 nodes it will definitely HAVE to go to another
> node.
> 
> Therefore, if you wanted to pin behavior to a local replica you'd have to
> send your work out in a token aware fashion where said work only goes to
> the primary token owner of that data, and remove any shuffling of replicas
> in the process (is only on by default in the java driver to my knowledge).
> 
> On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <fr...@gmail.com>
> wrote:
> 
> > Hi there,
> >
> > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > (instance type t2.2xlarge), the process running separately on each of the
> > nodes (i.e. 4 running JVMs).
> > The process is just doing reads from Cassandra and building a SOLR index
> > and using the java driver with consistency level LOCAL_ONE.
> > However, the following exception is through:
> >
> > com.datastax.driver.core.exceptions.TransportException: [/10.0.0.2:9042]
> > Connection has been closed
> >         at com.datastax.driver.core.exceptions.TransportException.
> > copy(TransportException.java:38)
> >         at com.datastax.driver.core.exceptions.TransportException.
> > copy(TransportException.java:24)
> >         at com.datastax.driver.core.DriverThrowables.propagateCause(
> > DriverThrowables.java:37)
> >         at com.datastax.driver.core.ArrayBackedResultSet$
> > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> >         at com.datastax.driver.core.ArrayBackedResultSet$
> > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > hasNext(ArrayBackedResultSet.java:143)
> >
> > where 10.0.0.2 is not the local machine. So my questions:
> >
> > - Should this happen when Im using consistency level LOCAL_ONE and just
> > doing reads ?
> > - Does this suggest non-local reads are happening ?
> >
> > Many thanks for any help/ideas.
> >
> > Frank
> >
> >
> >
> 
> 
> -- 
> 
> Thanks,
> Ryan Svihla
> 

Re: TransportException - Consistency LOCAL_ONE - EC2

Posted by Ryan Svihla <rs...@foundev.pro>.
LOCAL_ONE just means local to the datacenter by default the tokenaware
policy will go to a replica that owns that data (primary or any replica
depends on the driver) and that may or may not be the node the driver
process is running on.

So to put this more concretely if you have RF 2 with that 4 node cluster so
2 nodes will be responsible for that data and if your local process is not
running on one of those 2 nodes it will definitely HAVE to go to another
node.

Therefore, if you wanted to pin behavior to a local replica you'd have to
send your work out in a token aware fashion where said work only goes to
the primary token owner of that data, and remove any shuffling of replicas
in the process (is only on by default in the java driver to my knowledge).

On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <fr...@gmail.com>
wrote:

> Hi there,
>
> Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> (instance type t2.2xlarge), the process running separately on each of the
> nodes (i.e. 4 running JVMs).
> The process is just doing reads from Cassandra and building a SOLR index
> and using the java driver with consistency level LOCAL_ONE.
> However, the following exception is through:
>
> com.datastax.driver.core.exceptions.TransportException: [/10.0.0.2:9042]
> Connection has been closed
>         at com.datastax.driver.core.exceptions.TransportException.
> copy(TransportException.java:38)
>         at com.datastax.driver.core.exceptions.TransportException.
> copy(TransportException.java:24)
>         at com.datastax.driver.core.DriverThrowables.propagateCause(
> DriverThrowables.java:37)
>         at com.datastax.driver.core.ArrayBackedResultSet$
> MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
>         at com.datastax.driver.core.ArrayBackedResultSet$
> MultiPage.isExhausted(ArrayBackedResultSet.java:269)
>         at com.datastax.driver.core.ArrayBackedResultSet$1.
> hasNext(ArrayBackedResultSet.java:143)
>
> where 10.0.0.2 is not the local machine. So my questions:
>
> - Should this happen when Im using consistency level LOCAL_ONE and just
> doing reads ?
> - Does this suggest non-local reads are happening ?
>
> Many thanks for any help/ideas.
>
> Frank
>
>
>


-- 

Thanks,
Ryan Svihla